scli/scli

5647 lines
209 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import atexit
import base64
import bisect
import collections
import contextlib
import errno
import importlib
import json
import logging
import os
import pprint
import re
import resource
import shlex
import shutil
import signal as signal_ipc
import subprocess
import sys
import tempfile
import textwrap
from abc import ABC, abstractmethod
from datetime import datetime, timezone
import urwid
try:
from urwid_readline import ReadlineEdit
Edit = ReadlineEdit
except ImportError:
Edit = urwid.Edit
# #############################################################################
# constants
# #############################################################################
DATA_FOLDER = os.getenv('XDG_DATA_HOME', os.path.expanduser('~/.local/share'))
CFG_FOLDER = os.getenv('XDG_CONFIG_HOME', os.path.expanduser('~/.config'))
SIGNALCLI_LEGACY_FOLDER = os.path.join(CFG_FOLDER, 'signal')
SIGNALCLI_LEGACY_DATA_FOLDER = os.path.join(SIGNALCLI_LEGACY_FOLDER, 'data')
SIGNALCLI_LEGACY_ATTACHMENT_FOLDER = os.path.join(SIGNALCLI_LEGACY_FOLDER, 'attachments')
SIGNALCLI_FOLDER = os.path.join(DATA_FOLDER, 'signal-cli')
SIGNALCLI_DATA_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'data')
SIGNALCLI_ATTACHMENT_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'attachments')
SIGNALCLI_AVATARS_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'avatars')
SIGNALCLI_STICKERS_FOLDER = os.path.join(SIGNALCLI_FOLDER, 'stickers')
SCLI_DATA_FOLDER = os.path.join(DATA_FOLDER, 'scli')
SCLI_ATTACHMENT_FOLDER = os.path.join(SCLI_DATA_FOLDER, 'attachments')
SCLI_HISTORY_FILE = os.path.join(SCLI_DATA_FOLDER, 'history')
SCLI_CFG_FILE = os.path.join(CFG_FOLDER, 'sclirc')
SCLI_LOG_FILE = os.path.join(SCLI_DATA_FOLDER, 'log')
SCLI_EXEC_FOLDER = os.path.dirname(os.path.realpath(__file__))
SCLI_README_FILE = os.path.join(SCLI_EXEC_FOLDER, 'README.md')
# #############################################################################
# utility
# #############################################################################
def noop(*_args, **_kwargs):
pass
def get_nested(dct, *keys, default=None):
for key in keys:
try:
dct = dct[key]
except (KeyError, TypeError, IndexError):
return default
return dct
def get_urls(txt):
return re.findall(r'(https?://[^\s]+)', txt)
def callf(cmd, rmap=None, background=False, **subprocess_kwargs):
if rmap:
optionals = rmap.pop("_optionals", ())
for key, val in rmap.items():
if key not in cmd and key not in optionals:
raise ValueError(f'Command string `{cmd}` should contain a replacement placeholder `{key}` (e.g. `some-cmd "{key}"`). See `--help`.')
cmd = cmd.replace(key, val)
if not subprocess_kwargs.get('shell'):
cmd = shlex.split(cmd)
logging.debug('callf: `%s`', cmd)
if background:
for arg in ('stdin', 'stdout', 'stderr'):
subprocess_kwargs.setdefault(arg, subprocess.DEVNULL)
proc = subprocess.Popen(cmd, **subprocess_kwargs)
return proc
subprocess_kwargs.setdefault('text', True)
proc = subprocess.run(cmd, **subprocess_kwargs)
if proc.returncode != 0:
logging.error(
'callf: %s: exit code: %d, stderr: %s',
proc.args,
proc.returncode,
proc.stderr
)
elif proc.stdout:
logging.debug('callf: %s', proc.stdout)
return proc
def get_prog_dir():
return os.path.dirname(os.path.realpath(__file__))
def get_version():
"""Get this program's version.
Based on either `git describe`, or, if not available (e.g. for a release downloaded without the `.git` dir), use VERSION file populated during the creation of the release.
Does not output the leading `v` if it's present in git tag's name.
"""
# Do not use `logging` in this function, as it's called before logging.basicConfig().
prog_dir = get_prog_dir()
git_dir = os.path.join(prog_dir, '.git')
git_cmd = ['git', '--git-dir', git_dir, 'describe']
try:
proc = subprocess.run(git_cmd, capture_output=True, check=True, text=True)
return proc.stdout.strip('v\n')
except (FileNotFoundError, subprocess.CalledProcessError):
pass
version_file_path = os.path.join(prog_dir, 'VERSION')
try:
with open(version_file_path, encoding='utf-8') as f:
version_str = f.readline()
except OSError:
return '?'
if not version_str.startswith('v'):
# '$Format:...' - not a `git archive` (e.g. a manually dl'ed blob)
# '%(..)' - `git archive` if git < 2.32
return '?'
return version_str[1:] # `git-describe`-like string
def get_default_editor():
for env_var in ('VISUAL', 'EDITOR'):
ret = os.getenv(env_var)
if ret is not None:
return ret
for exe in ('sensible-editor', 'editor', 'nano', 'emacs', 'vi'):
ret = shutil.which(exe)
if ret is not None:
return ret
return ret
PHONE_NUM_REGEX = re.compile('^\\+[1-9][0-9]{6,14}$')
# https://github.com/signalapp/libsignal-service-java/blob/master/java/src/main/java/org/whispersystems/signalservice/api/util/PhoneNumberFormatter.java
def is_number(number):
return bool(PHONE_NUM_REGEX.match(number))
def is_path(path):
return path.startswith(("/", "~/", "./"))
PATH_RE = re.compile(
r"""
# Matches a path-like string, with whitespaces escaped or with the whole path in quotes.
(
(
\\\ | # escaped whitespace OR ..
[^'" ] # .. not a quote or space
)+
) # Path with escaped whitespace ..
| # .. OR ..
( # .. path in quotes.
(?P<quote>['"]) # a quote char; name the capture
.+? # anything, non-greedily
(?P=quote) # matching quote
)
""",
re.VERBOSE,
)
def split_path(string):
string = string.strip()
if not string:
return ['', '']
re_match = PATH_RE.match(string)
if not re_match:
return ['', string]
path = re_match.group()
if re_match.group(1): # unquoted path
path = path.replace(r'\ ', ' ')
else: # path in quotes
path = path.strip('\'"')
rest = string[re_match.end() :].strip()
return [path, rest] if rest else [path]
def get_current_timestamp_ms():
return int(datetime.now().timestamp() * 1000)
def utc2local(utc_dt):
return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=None)
def strftimestamp(timestamp, strformat='%H:%M:%S (%Y-%m-%d)'):
try:
date = datetime.utcfromtimestamp(timestamp)
except ValueError:
date = datetime.utcfromtimestamp(timestamp / 1000)
return utc2local(date).strftime(strformat)
def strip_non_printable_chars(string):
if string.isprintable():
return string
return ''.join((c for c in string if c.isprintable()))
# #############################################################################
# signal utility
# #############################################################################
def get_contact_id(contact_dict):
return contact_dict.get('number') or contact_dict.get('groupId')
def is_contact_group(contact_dict):
return 'groupId' in contact_dict
def is_group_v2(group_dict):
gid = group_dict['groupId']
return len(gid) == 44
def get_envelope_data_val(envelope, *keys, default=None, return_tuple=False):
data_message_ret = get_nested(envelope, 'dataMessage', *keys, default=default)
sync_message_ret = get_nested(envelope, 'syncMessage', 'sentMessage', *keys, default=default)
if return_tuple:
return (data_message_ret, sync_message_ret)
else:
return data_message_ret or sync_message_ret
def is_envelope_outgoing(envelope):
return (
'target' in envelope
or get_nested(envelope, 'syncMessage', 'sentMessage') is not None
or get_nested(envelope, 'callMessage', 'answerMessage') is not None
)
def is_envelope_group_message(envelope):
return (
get_envelope_data_val(envelope, 'groupInfo') is not None
or ('target' in envelope and not is_number(envelope['target']))
or get_nested(envelope, 'typingMessage', 'groupId') is not None
)
def get_envelope_msg(envelope):
# If the `message` field is absent from the envelope: return None. If it is present but contains no text (since signal-cli v0.6.8, this is represented as `'message': null`): return ''. Otherwise: return the `message` field's value.
for msg in get_envelope_data_val(envelope, 'message', default=0, return_tuple=True):
if msg is None:
return ''
elif msg != 0:
return msg
return None
def get_envelope_time(envelope):
return (
envelope['timestamp']
or get_envelope_data_val(envelope, 'timestamp')
)
def get_envelope_contact_id(envelope):
return (
envelope.get('target')
or get_envelope_data_val(envelope, 'groupInfo', 'groupId')
or get_nested(envelope, 'syncMessage', 'sentMessage', 'destination')
or get_nested(envelope, 'typingMessage', 'groupId')
or envelope.get('sourceNumber')
or envelope['source']
)
def get_envelope_sender_id(envelope):
return envelope['source']
def get_envelope_quote(envelope):
return get_envelope_data_val(envelope, 'quote')
def get_envelope_reaction(envelope):
return get_envelope_data_val(envelope, 'reaction')
def get_envelope_mentions(envelope):
return get_envelope_data_val(envelope, 'mentions')
def get_envelope_remote_delete(envelope):
return get_envelope_data_val(envelope, 'remoteDelete')
def get_envelope_sticker(envelope):
return get_envelope_data_val(envelope, 'sticker')
def get_envelope_attachments(envelope):
return get_envelope_data_val(envelope, 'attachments')
def get_attachment_name(attachment):
if isinstance(attachment, dict):
filename = attachment['filename']
return filename if filename else attachment['contentType']
else:
return os.path.basename(attachment)
def get_attachment_path(attachment):
try:
aid = attachment['id']
except TypeError:
return attachment
received_attachment = os.path.join(SIGNALCLI_ATTACHMENT_FOLDER, aid)
if not os.path.exists(received_attachment):
received_attachment = os.path.join(SIGNALCLI_LEGACY_ATTACHMENT_FOLDER, aid)
return received_attachment
def get_sticker_file_path(sticker):
dir_name = sticker['packId']
file_name = str(sticker['stickerId'])
return os.path.join(SIGNALCLI_STICKERS_FOLDER, dir_name, file_name)
def b64_to_bytearray(group_id):
return ','.join(str(i) for i in base64.b64decode(group_id.encode()))
def b64_to_hex_str(group_id):
return base64.b64decode(group_id.encode()).hex()
def hex_str_to_b64(hex_str):
return base64.b64encode(bytes.fromhex(hex_str)).decode()
# #############################################################################
# clipboard
# #############################################################################
TEMPFILE_PREFIX = '_scli-tmp.'
class ClipBase(ABC):
mime_order = ['image/png', 'image/jpeg', 'image/jpg', 'text/uri-list']
@staticmethod
def get_installed_clipb_manager():
for executable in ('wl-paste', 'xclip'):
if shutil.which(executable) is not None:
return executable
return None
_get_types_arg = None
_get_files_arg = None
_put_cmd = None
@abstractmethod
def _run_cmd(self, args):
pass
@abstractmethod
def _get_args(self, mime):
pass
def _run(self, args):
try:
proc = subprocess.run(
self._run_cmd(args),
capture_output=True,
check=True,
)
except (OSError, subprocess.CalledProcessError):
return None
return proc.stdout
def _get(self, mime):
return self._run(self._get_args(mime))
def _run_lines(self, args):
out = self._run(args)
if out:
return out.decode('utf-8').splitlines()
return None
def _get_types(self):
return self._run_lines(self._get_types_arg)
def _get_files(self):
return [f for f in self._run_lines(self._get_files_arg) if f]
def _clip_files(self):
out = self._get_types()
if out is None:
return out
for otype in out:
for mtype in self.mime_order:
if mtype == otype:
if mtype.startswith('image/'):
content = self._get(mtype)
suffix = '.' + mtype.split('/')[1]
if cfg.save_history:
clip_file_path = os.path.join(
SCLI_ATTACHMENT_FOLDER,
f"clipboard_{datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}{suffix}"
)
clip_file = open(clip_file_path, 'w+b')
else:
clip_file = tempfile.NamedTemporaryFile(
mode='w+b',
prefix=TEMPFILE_PREFIX,
suffix=suffix,
delete=False,
)
with clip_file:
clip_file.write(content)
return [clip_file.name]
elif mtype == 'text/uri-list':
content = self._get_files()
return [x.replace('file://', '') for x in content]
return None
def files(self):
cmd = cfg.clipboard_get_command
if cmd is None:
return self._clip_files()
return callf(cmd, capture_output=True).stdout.splitlines()
def _put(self, txt):
if not txt:
return
try:
proc = subprocess.Popen(
self._put_cmd,
stdin=subprocess.PIPE,
text=True,
)
except OSError:
return
else:
with proc:
proc.stdin.write(txt)
def put(self, txt):
cmd = cfg.clipboard_put_command
if cmd is None:
return self._put(txt)
return callf(cmd, {'%s': txt})
class Xclip(ClipBase):
_get_types_arg = 'TARGETS'
_get_files_arg = 'text/uri-list'
_put_cmd = ['xclip', '-selection', 'clipboard']
def _run_cmd(self, args):
return ['xclip', '-selection', 'clipboard', '-t', args, '-o']
def _get_args(self, mime):
return mime
class WLclip(ClipBase):
_get_types_arg = ['-l']
_get_files_arg = ['-t', 'text/uri-list']
_put_cmd = ['wl-copy']
def _run_cmd(self, args):
return ['wl-paste', *args]
def _get_args(self, mime):
return ['-t', mime]
# #############################################################################
# AsyncProc & Daemon
# #############################################################################
class AsyncProc:
def __init__(self, main_loop):
# The `main_loop` is an object like `urwid.MainLoop`, that implements `watch_pipe()` and `set_alarm_in()` methods.
self.main_loop = main_loop
def _on_proc_started(self, proc): pass
def _on_proc_done(self, proc): pass
def run(self, args, shell, callback, callback_args, callback_kwargs):
""" Run the command composed of `args` in the background (asynchronously); run the `callback` function when it finishes """
def watchpipe_handler(line):
# This function is run when the shell process returns (finishes execution).
# The `line` printed to watch pipe is of the form "b'<PID> <RETURN_CODE>\n'"
_proc_pid, return_code = [int(i) for i in line.decode().split()]
proc.wait() # reap the child process, to prevent zombies
proc.returncode = return_code # overwrite the 'wrapper' command return code (always 0) with the actual command return code
proc.output = proc.stderr.read().rstrip('\n') # stderr stream is not seekable, so can be read only once
proc.stderr.close()
if return_code != 0:
logging.error(
'proc: cmd:`%s`; return_code:%d; output:"%s"',
proc.args,
return_code,
proc.output,
)
if callback is not None:
callback(proc, *callback_args, **callback_kwargs)
self._on_proc_done(proc)
os.close(watchpipe_fd) # Close the write end of watch pipe.
return False # Close the read end of watch pipe and remove the watch from event_loop.
watchpipe_fd = self.main_loop.watch_pipe(watchpipe_handler)
# If the command is run with Popen(.., shell=True), shlex.quote is needed to escape special chars in args.
sh_command = " ".join(
[shlex.quote(arg) for arg in args] if not shell else ['{', args, ';', '}']
)
# Redirect all the process's output to stderr, and write the process PID and exit status to the watch pipe.
sh_command += " 1>&2; echo $$ $?"
proc = subprocess.Popen(
sh_command,
shell=True,
stdout=watchpipe_fd,
stderr=subprocess.PIPE,
universal_newlines=True,
)
atexit.register(proc.kill) # prevent orphaned processes surviving after the main program is stopped
self._on_proc_started(proc)
return proc
class AsyncQueued(AsyncProc):
_MAX_BACKGROUND_PROCS_DEFAULT = 64
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._curr_running = set()
self._run_queue = collections.deque()
try:
self._max_background_procs = self._background_procs_resource(self._MAX_BACKGROUND_PROCS_DEFAULT)
except OSError:
self._max_background_procs = self._MAX_BACKGROUND_PROCS_DEFAULT
@staticmethod
def _background_procs_resource(default):
nprocs = min(
default,
resource.getrlimit(resource.RLIMIT_NPROC)[0],
)
try:
n_curr_fds = len(os.listdir('/proc/self/fd')) * 3 # x3 to account for signal-cli's added FDs after it starts
except OSError:
n_curr_fds = 32
fd_limits = [nprocs*3+n_curr_fds] # each proc opens 3 FDs
for res in (resource.RLIMIT_NOFILE, resource.RLIMIT_OFILE):
try:
fd_limits.append(resource.getrlimit(res)[0])
except (OSError, ValueError):
pass
return (min(fd_limits) - n_curr_fds) // 3
@property
def _max_procs_reached(self):
return len(self._curr_running) == self._max_background_procs
def _add_run_to_queue(self, run_args):
self._run_queue.append(run_args)
def run(self, args, callback=None, *callback_args, shell=False, **callback_kwargs):
run_args = {k: v for k, v in locals().items() if k not in ('self', '__class__')}
if not self._max_procs_reached:
return super().run(**run_args)
else:
return self._add_run_to_queue(run_args)
def _on_proc_started(self, proc):
super()._on_proc_started(proc)
self._curr_running.add(proc)
def _pop_run(self, proc):
self._curr_running.remove(proc)
try:
return self._run_queue.popleft()
except IndexError:
return None
def _on_proc_done(self, proc):
super()._on_proc_done(proc)
run_args_next = self._pop_run(proc)
if run_args_next is not None:
started_proc = super().run(**run_args_next)
else:
started_proc = None
return (run_args_next, started_proc)
class AsyncContext(AsyncQueued):
class ContextItem:
__slots__ = (
'procs',
'callback',
'callback_kwargs',
'proc_callback',
'proc_callback_kwargs',
'buffered_runs',
)
def __init__(self, callback, callback_kwargs, proc_callback, proc_callback_kwargs):
self.procs = set()
self.callback = callback
self.callback_kwargs = callback_kwargs
self.proc_callback = proc_callback
self.proc_callback_kwargs = proc_callback_kwargs
self.buffered_runs = set()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._context_items = collections.deque()
self._accepting_new_procs_for_item = False
def _new_item(self, callback, callback_kwargs, proc_callback, proc_callback_kwargs):
if callback is None and proc_callback is None:
return
self._context_items.append(
self.ContextItem(callback, callback_kwargs, proc_callback, proc_callback_kwargs)
)
self._accepting_new_procs_for_item = True
@property
def _curr_item(self):
return self._context_items[-1]
def _on_proc_started(self, proc):
super()._on_proc_started(proc)
if not self._accepting_new_procs_for_item:
return
self._curr_item.procs.add(proc)
def _finalize_item(self):
self._accepting_new_procs_for_item = False
if not self._context_items:
return
curr_item = self._curr_item
if not (curr_item.procs or curr_item.buffered_runs):
# No background procs have been started
self._pop_callback()
def _pop_callback(self, item=None):
if item is None:
item = self._context_items.pop()
else:
self._context_items.remove(item)
if item.callback is not None:
item.callback(**item.callback_kwargs)
@contextlib.contextmanager
def callback_finally(
self,
callback=None,
proc_callback=None,
proc_callback_kwargs=None,
**callback_kwargs
):
"""Execute callback function after all background processes started inside this context have finished.
Optionally, run `proc_callback` after every background processes that exits.
"""
proc_callback_kwargs = proc_callback_kwargs or {}
try:
yield self._new_item(
callback,
callback_kwargs,
proc_callback,
proc_callback_kwargs,
)
finally:
self._finalize_item()
@staticmethod
def _run_id(run_params):
return id(run_params)
def _add_buffered_run(self, run_params):
if not self._accepting_new_procs_for_item:
return
self._curr_item.buffered_runs.add(self._run_id(run_params))
def _add_run_to_queue(self, run_args):
super()._add_run_to_queue(run_args)
self._add_buffered_run(run_args)
def _remove_buffered_run(self, run_params, started_proc):
if run_params is None:
return
run_id = self._run_id(run_params)
for item in self._context_items:
try:
item.buffered_runs.remove(run_id)
except KeyError:
continue
else:
item.procs.add(started_proc)
# There should be no race condition if the proc has already finished: the python code is executed in a single thread, and this is always run before the proc's return is processed.
return
def _on_proc_done(self, proc):
self._remove_buffered_run(*super()._on_proc_done(proc))
for item in self._context_items:
try:
item.procs.remove(proc)
except KeyError:
continue
if item.proc_callback is not None:
item.proc_callback(proc, **item.proc_callback_kwargs)
if not (item.procs or item.buffered_runs or self._accepting_new_procs_for_item):
self._pop_callback(item)
return
class Daemon(AsyncContext):
def __init__(self, main_loop, username):
super().__init__(main_loop)
self._username = username
self._buffer = ''
self._msg_processing_paused = True
# Paused initially, to prevent a race condition betw registering the dbus service and getting a message on stdout (e.g. while polling in _run_when_dbus_service_started)
self.callbacks = {
cb_name: noop for cb_name in [
'daemon_started',
'daemon_log',
'receive_message',
'receive_sync_message',
'receive_receipt',
'receive_reaction',
'receive_sticker',
'sending_message',
'sending_reaction',
'sending_done',
'sending_reaction_done',
'contact_typing',
'call_message',
'contacts_sync',
'remote_delete',
'sending_remote_delete_done',
'untrusted_identity_err',
'user_unregistered_err',
]
}
def start(self):
stdout_fd = self.main_loop.watch_pipe(self._daemon_stdout_handler)
stderr_fd = self.main_loop.watch_pipe(self._daemon_stderr_handler)
try:
proc = callf(
cfg.daemon_command,
{'%u': self._username, '_optionals': ['%u']},
background=True,
stdout=stdout_fd,
stderr=stderr_fd,
#text=True, # urwid returns bytes-objects anyway; see comment in _daemon_stdout_handler
)
except FileNotFoundError:
sys.exit(
f"ERROR: could not find `{cfg.daemon_command.split()[0]}` executable. "
"Make sure it is on system path."
)
return proc
def _daemon_stdout_handler(self, output):
output = self._buffer + output.decode()
# The `output` (supplied by urwid) is a `bytes` object, even when the `subprocess` is launched with `text=True`.
if self._msg_processing_paused:
self._buffer = output
return
lines = output.split('\n') # Different from splitlines(): adds a final '' element after '\n'
self._buffer = lines.pop()
for line in lines:
if not line or line.isspace():
continue
try:
json_data = json.loads(line)
envelope = json_data['envelope']
except (json.JSONDecodeError, KeyError) as err:
logging.error('Could not parse daemon output: %s', line)
logging.exception(err)
return
logging.debug("Daemon: json_data = \n%s", pprint.pformat(json_data))
error_data = None
for error_key in ('error', 'exception'):
try:
error_data = json_data[error_key]
except KeyError:
continue
self._error_data_handler(error_data, envelope)
if error_data is None:
self._envelope_handler(envelope)
def _daemon_stderr_handler(self, output):
line = output.decode().strip()
if not line:
return
logging.info('daemon_log: %s', line)
self.callbacks['daemon_log'](line)
if any(s in line for s in (
"Exported dbus object: /org/asamk/Signal", # signal-cli v0.9.2 or earlier
"DBus daemon running",
)):
self._run_when_dbus_service_started(
self.callbacks['daemon_started']
)
def _envelope_handler(self, envelope):
envelope['_received_timestamp'] = get_current_timestamp_ms()
if get_envelope_msg(envelope) or get_envelope_attachments(envelope):
if get_nested(envelope, 'syncMessage', 'sentMessage') is not None:
self.callbacks['receive_sync_message'](envelope)
else:
self.callbacks['receive_message'](envelope)
elif envelope.get('receiptMessage') is not None:
# In signal-cli >=0.7.3, above check can be replaced with just
# 'receiptMessage' in envelope
# Keeping `is not None` for compatiability with envelopes in history from older signal-cli versions.
self.callbacks['receive_receipt'](envelope)
elif 'typingMessage' in envelope:
self.callbacks['contact_typing'](envelope)
elif get_envelope_reaction(envelope):
self.callbacks['receive_reaction'](envelope)
elif envelope.get('callMessage') is not None:
self.callbacks['call_message'](envelope)
elif get_nested(envelope, 'syncMessage', 'type') in ('CONTACTS_SYNC', 'GROUPS_SYNC'):
self.callbacks['contacts_sync']()
elif get_envelope_data_val(envelope, 'groupInfo', 'type') == 'UPDATE':
self.callbacks['contacts_sync']()
elif get_envelope_remote_delete(envelope):
self.callbacks['remote_delete'](envelope)
elif get_envelope_sticker(envelope):
self.callbacks['receive_sticker'](envelope)
else:
logging.info('No action for received envelope: %s', pprint.pformat(envelope))
def _error_data_handler(self, error_data, envelope):
logging.error("Daemon: error = \n%s", pprint.pformat(error_data))
if error_data.get('type') == 'UntrustedIdentityException':
self.callbacks['untrusted_identity_err'](envelope)
def pause_message_processing(self):
self._msg_processing_paused = True
def unpause_message_processing(self):
self._msg_processing_paused = False
self._daemon_stdout_handler(b'')
def _dbus_send(self, args, *proc_args, async_proc=True, **proc_kwargs):
args = [
'dbus-send',
'--session',
'--type=method_call',
'--print-reply=literal',
*args
]
if async_proc:
proc = self.run(args, *proc_args, **proc_kwargs)
else:
proc = subprocess.run(args, *proc_args, **proc_kwargs)
return proc
def _dbus_send_signal_cli(self, args, *proc_args, **proc_kwargs):
""" Send a command to signal-cli daemon through dbus """
args = [
'--dest=org.asamk.Signal',
'/org/asamk/Signal',
*args
]
return self._dbus_send(args, *proc_args, **proc_kwargs)
def _send_message_dbus_cmd(self, message, attachments, recipient, is_group=False, *proc_args, **proc_kwargs):
args = [
('org.asamk.Signal.sendMessage'
if not is_group else
'org.asamk.Signal.sendGroupMessage'),
'string:' + message,
'array:string:' + ','.join(attachments),
('string:' + recipient
if not is_group else
'array:byte:' + b64_to_bytearray(recipient))
]
self._dbus_send_signal_cli(args, *proc_args, **proc_kwargs)
def _send_reaction_dbus_cmd(self, emoji, remove, target_author, target_sent_timestamp, recipient, is_group=False, *proc_args, **proc_kwargs):
dbus_args = [
('org.asamk.Signal.sendMessageReaction'
if not is_group else
'org.asamk.Signal.sendGroupMessageReaction'),
"string:" + emoji,
"boolean:" + str(remove).lower(),
"string:" + target_author,
"int64:" + str(target_sent_timestamp),
('string:' + recipient
if not is_group else
'array:byte:' + b64_to_bytearray(recipient))
]
self._dbus_send_signal_cli(dbus_args, *proc_args, **proc_kwargs)
def _send_remote_delete_dbus_cmd(self, target_sent_timestamp, recipient, is_group=False, *proc_args, **proc_kwargs):
dbus_args = [
('org.asamk.Signal.sendRemoteDeleteMessage'
if not is_group else
'org.asamk.Signal.sendGroupRemoteDeleteMessage'),
"int64:" + str(target_sent_timestamp),
('string:' + recipient
if not is_group else
'array:byte:' + b64_to_bytearray(recipient))
]
self._dbus_send_signal_cli(dbus_args, *proc_args, **proc_kwargs)
def _parse_send_proc_output(self, proc, envelope, callback_name):
if proc.returncode != 0:
if any(s in proc.output for s in (
"UntrustedIdentity",
"Untrusted Identity",
)):
self.callbacks['untrusted_identity_err'](envelope)
elif "Unregistered user" in proc.output:
# Related signal-cli issues: #348, #828.
if not is_envelope_group_message(envelope):
self.callbacks['user_unregistered_err'](envelope)
else:
# Ad-hoc parsing of signal-cli's stderr output.
timestamp_adj = int(proc.output.splitlines()[0].rsplit(': ', 1)[-1])
logging.warning("some group members have uninstalled signal: %s", proc.output)
self.callbacks[callback_name](envelope, 'ignore_receipts', timestamp_adj)
return
self.callbacks[callback_name](envelope, 'send_failed')
return
# Set envelope timestamp to that returned by signal-cli
try:
timestamp_adj = int(proc.output.rsplit(maxsplit=1)[1])
except (IndexError, AttributeError) as err:
logging.error("send_message: Failed to get adjusted envelope timestamp")
logging.exception(err)
self.callbacks[callback_name](envelope)
else:
self.callbacks[callback_name](envelope, 'sent', timestamp_adj)
def send_message(self, contact_id, message="", attachments=None):
is_group = not is_number(contact_id)
if attachments is None:
attachments = []
attachment_paths = [os.path.expanduser(attachment) for attachment in attachments]
if not all(os.path.exists(attachment_path) for attachment_path in attachment_paths):
logging.warning('send_message: Attached file(s) does not exist.')
return
timestamp = get_current_timestamp_ms()
envelope = {
'source': self._username,
'target': contact_id,
'timestamp': timestamp,
'dataMessage': {
'message': message,
'attachments': attachments,
'timestamp': timestamp,
},
}
def after_send_proc_returns(proc):
# Remove temproary attachments
for attachment in envelope['dataMessage']['attachments']:
if attachment.startswith(
os.path.join(tempfile.gettempdir(), TEMPFILE_PREFIX)
):
os.remove(attachment)
# Check if send command succeeded
self._parse_send_proc_output(proc, envelope, 'sending_done')
self._send_message_dbus_cmd(
message,
attachment_paths,
contact_id,
is_group,
callback=after_send_proc_returns,
)
logging.info('send_message: %s', envelope)
self.callbacks['sending_message'](envelope)
def send_reaction(self, contact_id, emoji, orig_author, orig_ts, remove=False):
is_group = not is_number(contact_id)
timestamp = get_current_timestamp_ms()
envelope = {
'source': self._username,
'target': contact_id,
'timestamp': timestamp,
'dataMessage': {
'message': None,
'timestamp': timestamp,
'reaction': {
'emoji': emoji,
'isRemove': remove,
'targetAuthor': orig_author,
'targetAuthorNumber': orig_author,
'targetSentTimestamp': orig_ts,
},
},
}
if is_group:
envelope['dataMessage']['groupInfo'] = {
'groupId': contact_id,
}
def after_send_proc_returns(proc):
self._parse_send_proc_output(proc, envelope, 'sending_reaction_done')
self._send_reaction_dbus_cmd(
emoji,
remove,
orig_author,
orig_ts,
contact_id,
is_group,
callback=after_send_proc_returns
)
logging.info('send_reaction: %s', envelope)
self.callbacks['sending_reaction'](envelope)
def send_remote_delete(self, contact_id, orig_ts):
is_group = not is_number(contact_id)
timestamp = get_current_timestamp_ms()
envelope = {
'source': self._username,
'target': contact_id,
'timestamp': timestamp,
'dataMessage': {
'message': None,
'timestamp': timestamp,
'remoteDelete': {
'timestamp': orig_ts,
},
},
}
if is_group:
envelope['dataMessage']['groupInfo'] = {
'groupId': contact_id,
}
self._send_remote_delete_dbus_cmd(
orig_ts,
contact_id,
is_group,
callback=lambda proc: self._parse_send_proc_output(proc, envelope, 'sending_remote_delete_done')
)
self.callbacks['remote_delete'](envelope)
def rename_contact(self, contact_id, new_name, is_group=False, *proc_args, **proc_kwargs):
"""Rename a contact or group.
If a contact does not exist, it will be created. Changes to groups are sent to the server, changes to individual contacts are local.
"""
if not is_group:
args = [
"org.asamk.Signal.setContactName",
"string:" + contact_id,
"string:" + new_name,
]
else:
args = [
"org.asamk.Signal.updateGroup",
"array:byte:" + b64_to_bytearray(contact_id),
"string:" + new_name,
"array:string:" + '', # members
"string:" + '' # avatar
]
self._dbus_send_signal_cli(args, *proc_args, **proc_kwargs)
def _get_group_name(self, group_id, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
name = proc.output.strip() or group_id[:10] + '[..]'
callback(name, *cb_args, **cb_kwargs)
args = [
"org.asamk.Signal.getGroupName",
"array:byte:" + b64_to_bytearray(group_id)
]
self._dbus_send_signal_cli(args, callback=proc_callback)
def _get_group_members(self, group_id, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
# Ad hoc parsing of `dbus-send` output
members_ids = set(proc.output[10:-1].split())
callback(members_ids, *cb_args, **cb_kwargs)
args = [
"org.asamk.Signal.getGroupMembers",
"array:byte:" + b64_to_bytearray(group_id)
]
self._dbus_send_signal_cli(args, callback=proc_callback)
def get_groups_ids(self, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
groups_ids = set()
bytearray_line_continued = False
for line in proc.output.splitlines():
# Ad hoc parsing of `dbus-send` output
if line.endswith('array of bytes ['):
gid_bytes_array = ''
bytearray_line_continued = True
elif bytearray_line_continued:
if line.startswith(" ]"):
bytearray_line_continued = False
groups_ids.add(hex_str_to_b64(gid_bytes_array))
else:
gid_bytes_array += line.strip()
callback(groups_ids, *cb_args, **cb_kwargs)
args = ["org.asamk.Signal.listGroups"]
self._dbus_send_signal_cli(args, callback=proc_callback)
def populate_groups_dict(self, groups_ids, callback, **cb_kwargs):
groups_dict = {}
with self.callback_finally(callback, groups_dict=groups_dict, **cb_kwargs):
for group_id in groups_ids:
group = groups_dict[group_id] = {}
group['groupId'] = group_id
self._get_group_name(
group_id,
callback=lambda name, group=group: group.update({'name': name})
)
self._get_group_members(
group_id,
callback=lambda members_ids, group=group: group.update({'members_ids': members_ids})
)
def _get_contacts_numbers(self, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
try:
# Ad-hoc parsing of `dbus-send` output
numbers = proc.output.splitlines()[1][:-1].split()
except IndexError:
numbers = []
callback(numbers, *cb_args, **cb_kwargs)
args = ["org.asamk.Signal.listNumbers"]
self._dbus_send_signal_cli(args, callback=proc_callback)
def _get_contact_name(self, phone_num, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
name = proc.output.strip()
callback(phone_num, name, *cb_args, **cb_kwargs)
args = [
"org.asamk.Signal.getContactName",
"string:" + phone_num
]
self._dbus_send_signal_cli(args, callback=proc_callback)
def get_indiv_contacts(self, callback, *cb_args, **cb_kwargs):
def on_got_numbers(numbers):
with self.callback_finally(
callback=callback,
contacts_dict=contacts_dict,
*cb_args,
**cb_kwargs,
):
for num in numbers:
self._get_contact_name(num, callback=on_got_name)
def on_got_name(phone_num, name):
contacts_dict[phone_num] = {}
contacts_dict[phone_num]["name"] = name
contacts_dict[phone_num]["number"] = phone_num
contacts_dict = {}
self._get_contacts_numbers(callback=on_got_numbers)
def get_signal_cli_version(self, callback, *cb_args, **cb_kwargs):
def proc_callback(proc):
version_num = proc.output.strip()
version_string = "signal-cli " + version_num
callback(version_string, *cb_args, **cb_kwargs)
args = ["org.asamk.Signal.version"]
self._dbus_send_signal_cli(args, callback=proc_callback)
@property
def is_dbus_service_running(self):
args = [
'--dest=org.freedesktop.DBus',
'/org/freedesktop/DBus',
'org.freedesktop.DBus.ListNames'
]
proc = self._dbus_send(args, async_proc=False, capture_output=True, text=True, check=True)
signal_cli_str = "org.asamk.Signal"
return signal_cli_str in proc.stdout
def _run_when_dbus_service_started(self, callback):
poll_freq = 1 # seconds between polls
def set_alarm(main_loop, _user_data=None):
if self.is_dbus_service_running:
callback()
else:
main_loop.set_alarm_in(poll_freq, set_alarm)
set_alarm(self.main_loop)
# #############################################################################
# signal-cli data
# #############################################################################
class SignalData:
def __init__(self, username):
self._username = username
self._file_path = self._get_account_file_path()
self._data = self._read_data_file()
@staticmethod
def parse_accounts_json():
accounts_json_path = os.path.join(SIGNALCLI_DATA_FOLDER, "accounts.json")
try:
with open(accounts_json_path, encoding="utf-8") as f:
json_data = json.load(f)
except FileNotFoundError:
return []
return json_data["accounts"]
def _get_account_file_path(self):
accounts = self.parse_accounts_json()
if accounts:
for acc in accounts:
if acc["number"] == self._username:
return os.path.join(SIGNALCLI_DATA_FOLDER, acc["path"])
for file_path in (
os.path.join(SIGNALCLI_DATA_FOLDER, self._username),
os.path.join(SIGNALCLI_LEGACY_DATA_FOLDER, self._username),
):
if os.path.exists(file_path):
return file_path
raise FileNotFoundError(self._username + " does not exist!")
def _read_data_file(self):
with open(self._file_path, encoding="utf-8") as f:
return json.load(f)
@property
def own_num(self):
return self._username
@property
def is_linked_device(self):
# The primary device should have a `deviceId == 1`
return self._data['deviceId'] != 1
class Contact:
# A `Contact` can be either an individual contact or a group.
# This class uses the _record dict with contact's details, which is what is obtained from contactsStore and groupsStore in signal-cli data file's json structure.
def __init__(self, record):
self._record = record
self.avatar = self._get_avatar_file_path()
if self.is_group:
self.members_ids = record.get('members_ids') or set()
self.member_contacts = set()
def __getattr__(self, attr):
# A helper function to access values in contact's dict `record`.
return self._record.get(attr)
def update_record(self, update_dict):
self._record.update(update_dict)
def _get_avatar_file_path(self):
# Might be implemented in the future by signal-cli: https://github.com/AsamK/signal-cli/issues/869
def get_path(file_prefix, contact_id):
path = os.path.join(
SIGNALCLI_AVATARS_FOLDER,
f'{file_prefix}-{contact_id}',
)
return path if os.path.exists(path) else None
if self.is_group:
return get_path('group', self.id.replace("/", "_"))
for file_prefix in ('profile', 'contact'):
path = get_path(file_prefix, self.id)
if path is not None:
return path
return None
@property
def is_group(self):
return is_contact_group(self._record)
@property
def is_group_v2(self):
return is_group_v2(self._record)
@property
def id(self):
return get_contact_id(self._record)
@property
def name_or_id(self):
return self.name or self.id
def serialize(self):
return self._record
class Contacts:
def __init__(self, sigdata, contacts_cache=None):
self.sigdata = sigdata
self.indivs = set()
self.groups = set()
self.map = {}
self.update(contacts_cache, clear=True)
def _clear(self):
self.indivs = set()
self.groups = set()
self.map = {}
def update(self, contacts_dict=None, clear=False):
if clear:
self._clear()
if not contacts_dict:
return
for cid, contact_dict in contacts_dict.items():
contact = Contact(contact_dict)
self.map[cid] = contact
if is_contact_group(contact_dict):
self.groups.add(contact)
else:
self.indivs.add(contact)
def set_groups_membership(self):
for group in self.groups:
group.members_ids.discard(self.sigdata.own_num)
group.member_contacts = self._get_group_members(group)
# Naming: was (historically), group.members == group._record['members'] (from signal-cli data)
def _get_group_members(self, group):
members = set()
for mid in group.members_ids:
mem = self.map.get(mid)
if mem is None:
# Some members of a group might not be in my `contacts`, so they have no Contact obj associated with them.
mem = Contact({"number": mid})
members.add(mem)
return members
def get_by_id(self, contact_id):
return self.map.get(contact_id)
def serialize(self):
return {c.id: c.serialize() for c in self.map.values()}
# #############################################################################
# chats data
# #############################################################################
class Message:
_get_delivery_status = noop
_get_contact = noop
@classmethod
def set_class_functions(cls, get_delivery_status, get_contact):
cls._get_delivery_status = get_delivery_status
cls._get_contact = get_contact
__slots__ = ("envelope", "reactions", "remote_delete")
def __init__(self, envelope):
self.envelope = envelope
def __eq__(self, other_msg):
return self.envelope == other_msg.envelope
def __lt__(self, other_msg):
return self.local_timestamp < other_msg.local_timestamp
def __le__(self, other_msg):
return self.local_timestamp <= other_msg.local_timestamp
@property
def timestamp(self):
return get_envelope_time(self.envelope)
@timestamp.setter
def timestamp(self, ts_new):
# NOTE: For Message in Chat, use Chat.adjust_timestamp(), rather then this setter directly, to ensure that Chat remains sorted.
self.envelope['timestamp'] = self.envelope['dataMessage']['timestamp'] = ts_new
@property
def local_timestamp(self):
return self.envelope.get('_received_timestamp') or self.timestamp
@property
def text(self):
if self.mentions:
return self.text_w_mentions()
else:
return get_envelope_msg(self.envelope)
@property
def attachments(self):
return get_envelope_attachments(self.envelope)
@property
def mentions(self):
return get_envelope_mentions(self.envelope)
@property
def delivery_status(self):
if is_envelope_outgoing(self.envelope):
return self._get_delivery_status(self.timestamp).str
else:
return 'received_by_me'
@property
def delivery_status_detailed(self):
return self._get_delivery_status(self.timestamp)
@property
def contact_id(self):
return get_envelope_contact_id(self.envelope)
@property
def sender_num(self):
return get_envelope_sender_id(self.envelope)
@property
def sender(self):
return self._get_contact(self.sender_num)
def add_reaction(self, envelope):
self.reactions = getattr(self, 'reactions', {}) # pylint: disable=attribute-defined-outside-init
# Don't want to add `reactions` attribute to every Message instance; only to those that actually have reactions.
self.reactions[get_envelope_sender_id(envelope)] = envelope
@classmethod
def text_w_mentions_generic(cls, text, mentions, bracket_char=''):
# See also: What is the Mention's "length" parameter?
# https://github.com/AsamK/signal-cli/discussions/409
ret = ''
pos = 0
for mention in mentions:
contact_num = mention['name']
contact = cls._get_contact(contact_num)
contact_name = contact.name_or_id if contact else contact_num
start = mention['start']
ret = ''.join((
ret,
text[pos:start],
bracket_char,
"@", contact_name,
bracket_char,
))
pos = start + 1
ret += text[pos:]
return ret
def text_w_mentions(self, bracket_char=''):
text = get_envelope_msg(self.envelope)
return self.text_w_mentions_generic(text, self.mentions, bracket_char)
@property
def not_repliable(self):
envelope = self.envelope
return (
'typingMessage' in envelope
or envelope.get('callMessage') is not None
or getattr(self, 'remote_delete', None)
)
class ReorderedTimestamps(list):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._reordered_timestamps = {}
def _is_neighbd_monotonic(self, index):
try:
return self[index-1] <= self[index] <= self[index+1]
# Works for index=(0, len(self)), i.e. not 0 or -1. Not worth it to add checks for those cases.
except IndexError:
return True
# `self[index]` is either the last or the only message in Chat. In the former case, if it is out of order, the first comparison will return False, and IndexError will not be raised.
def _add_reordered_neighbors(self, index):
# When calling this method, ensure that len(self) > 1 and index == -1 for the last element (not len(self)-1).
if index == -1:
pre_post_pairs = ((self[-2], self[-1]), )
elif index == 0:
pre_post_pairs = ((self[0], self[1]), )
else:
pre_post_pairs = (
(self[index-1], self[index]),
(self[index], self[index+1]),
)
for pre_post in pre_post_pairs:
for tlr in (True, False):
local, received = pre_post if tlr else reversed(pre_post)
received_ts = received.envelope.get('_received_timestamp')
if received_ts is None:
continue
original_ts = received.timestamp
left_right = (original_ts, received_ts)
left, right = left_right if tlr else reversed(left_right)
if left <= local.local_timestamp <= right:
self._reordered_timestamps[
(received.sender_num, original_ts)
] = received_ts
# Including sender_num in the dict's key to prevent collisions in group chats between same-timestamp, different-sender messages.
def _delete_from_reordered(self, msg, index):
# This method is assumed to be run *after* the msg itself is already deleted.
self._reordered_timestamps.pop((msg.sender_num, msg.timestamp), None)
edge = True
if index == len(self):
# Assuming the supplied index != -1. However _add_reordered_neighbors() requires -1.
index = -1
elif index:
index -= 1
edge = False
modified = False
for ioffset in range(1) if edge else range(-1, 1):
try:
neighb = self[index+ioffset]
del self._reordered_timestamps[(neighb.sender_num, neighb.timestamp)]
modified = True
except (IndexError, KeyError):
pass
if modified:
self._add_reordered_neighbors(index)
class Chat(ReorderedTimestamps, urwid.MonitoredList):
# An `urwid.MonitoredList` is a subclass of a regular `list`, that modifies the "mutating" (modifying `self`) methods, so that they call the `self._modified()` method at the end.
# The `self._modified()` method is set to simply do `pass`, until a callback is assigned to it in ListWalker's __init__.
def index(self, msg):
"""More efficient way to locate an object in the sorted list than just using super().index() method.
Since Chat should always be sorted, a member object can be located faster using bisect."""
try:
msg_last = self[-1]
except IndexError as exc:
# Return "message-not-found" when chat history is blank.
raise ValueError from exc
if msg_last == msg:
# First check the last msg before doing bisect_left, as bisect starts in the middle. See also comment in self.add()
return len(self) - 1
index = bisect.bisect_left(self, msg)
if index != len(self) and self[index] == msg:
return index
raise ValueError
def index_ts(self, timestamp, sender_num=None):
"""Return an index of a message in Chat with a given timestamp, from a given phone number"""
def match_test(msg):
return (
msg.timestamp == timestamp
and
(msg.sender_num == sender_num or sender_num is None)
)
try:
msg = self[-1]
except IndexError as exc:
# Return "message-not-found" when chat history is blank.
raise ValueError from exc
if match_test(msg):
# First check the last msg before doing bisect_left, as bisect starts in the middle. See also comment in self.add()
return len(self) - 1
dummy_message = Message({'timestamp':
self._reordered_timestamps.get((sender_num, timestamp))
or timestamp
})
index = bisect.bisect_left(self, dummy_message)
if index != len(self):
for ind in (i for r in ((index, index-1), range(index+1, len(self))) for i in r):
# This generator expression is a "re-implementation" of itertools.chain().
# The indecies are ordered with the more likely matches tested first.
# The range(..) checks for msgs with the same timestamp (e.g. msgs in a group chat from different senders).
# index-1 might be a match if it is not in _reordered_timestamps and its _received_timestamp < timestamp.
msg = self[ind]
if match_test(msg):
return ind
if msg.timestamp > timestamp and ind > index:
break
raise ValueError
def get_index_for_envelope(self, envelope):
dummy_message = Message(envelope)
index = self.index(dummy_message)
return index
def get_msg_for_envelope(self, envelope):
index = self.get_index_for_envelope(envelope)
return self[index]
def get_msg_for_timestamp(self, timestamp, sender_num=None):
ind = self.index_ts(timestamp, sender_num)
return self[ind]
def add(self, msg):
index = -1
# Not using len(self)-1 because after self.append() it will shift.
try:
msg_last = self[index]
except IndexError:
# The chat is empty
self.append(msg)
return
if msg_last <= msg:
# Check first if the message should be appended at the end of Chat container.
# This is the case for most of the messages. The exceptions might occur if the system's clock has been moved back.
# `bisect` starts searching for the place for new item from the middle of the container, which takes more steps.
self.append(msg)
else:
index = bisect.bisect(self, msg)
self.insert(index, msg)
self._add_reordered_neighbors(index)
def delete(self, msg, index=None):
# The `index` is optional, but if known, will save cpu cycles for finding the message in chat.
try:
if index is None:
index = self.index(msg)
del self[index]
except (ValueError, IndexError) as err:
logging.info("Chat.delete(): message not found; envelope = %s", msg.envelope)
logging.exception(err)
raise ValueError from err
self._delete_from_reordered(msg, index)
def adjust_timestamp(self, msg, timestamp_adj, index=None):
"""Adjust message's timestamp, ensuring that Chat remains sorted"""
msg.timestamp = timestamp_adj
# Ensure that Chat remains sorted
# This should rarely be necessary, as signal-cli's timestamp adjustments are small enough (~50ms) to not modify the messages' order.
index = index or self.index(msg)
if not self._is_neighbd_monotonic(index):
logging.debug("Chat: moving msg to maintain sorted history: %s", timestamp_adj)
self.delete(msg, index)
self.add(msg)
class Chats:
def __init__(self):
self._dict = collections.defaultdict(Chat)
def __getitem__(self, contact_id):
return self._dict[contact_id]
def get_chat_for_envelope(self, envelope):
return self._dict[get_envelope_contact_id(envelope)]
def get_chat_index_for_envelope(self, envelope):
try:
chat = self.get_chat_for_envelope(envelope)
index = chat.get_index_for_envelope(envelope)
return chat, index
except (KeyError, ValueError, IndexError) as err:
logging.error("get_msg_for_envelope(): envelope = %s", envelope)
logging.exception(err)
raise ValueError from err
def get_msg_for_envelope(self, envelope):
chat, index = self.get_chat_index_for_envelope(envelope)
return chat[index]
def get_msg_for_timestamp(self, envelope, timestamp, sender_num=None):
chat = self.get_chat_for_envelope(envelope)
return chat.get_msg_for_timestamp(timestamp, sender_num)
def add_envelope(self, envelope):
msg = Message(envelope)
chat = self.get_chat_for_envelope(envelope)
chat.add(msg)
return msg
def add_reaction_envelope(self, envelope):
reaction = get_envelope_reaction(envelope)
try:
msg = self.get_msg_for_timestamp(
envelope,
timestamp=reaction['targetSentTimestamp'],
sender_num=reaction['targetAuthor']
)
except ValueError:
logging.error("Message not found for reaction: %s", pprint.pformat(envelope))
return None
msg.add_reaction(envelope)
return msg
def add_remote_delete_envelope(self, envelope):
try:
msg = self.get_msg_for_timestamp(
envelope,
timestamp=get_envelope_remote_delete(envelope)['timestamp'],
sender_num=get_envelope_sender_id(envelope)
)
except ValueError:
logging.error("Message not found for remote delete envelope: %s", envelope)
return None
msg.remote_delete = envelope
return msg
def delete_message(self, msg, index=None):
chat = self.get_chat_for_envelope(msg.envelope)
chat.delete(msg, index)
def serialize(self):
envelopes = []
for chat in self._dict.values():
for msg in chat:
envelope = msg.envelope
if (
"typingMessage" in envelope
or
envelope.get("_artificialEnvelope") == "untrustedIdentity"
):
continue
envelopes.append(envelope)
try:
envelopes.extend(msg.reactions.values())
except AttributeError:
pass
try:
# Currently, the "deleted" messages are saved in the history file.
envelopes.append(msg.remote_delete)
except AttributeError:
pass
return envelopes
class UnreadCounts(collections.defaultdict):
def __init__(self, *args, **kwargs):
super().__init__(int, *args, **kwargs)
@property
def total(self):
return sum(self.values())
def serialize(self):
return {contact_id: count for contact_id, count in self.items() if count != 0}
class DeliveryStatus:
DelivReadConts = collections.namedtuple('DelivReadConts', ['delivered', 'read'])
class DetailedStatus:
__slots__ = ("str", "when", "grp_memb_remain_un")
def __init__(self, status='', when=0, grp_memb_remain_un=None):
self.str = status
self.when = when
if grp_memb_remain_un:
self.grp_memb_remain_un = DeliveryStatus.DelivReadConts(
*(
set(contacts) if contacts else set()
for contacts in grp_memb_remain_un
)
)
def set_grp_memb_status(self, grp_member, status):
try:
grp_memb_remain_un = self.grp_memb_remain_un
except AttributeError:
return None
grp_memb_remaining = getattr(grp_memb_remain_un, status)
try:
grp_memb_remaining.remove(grp_member)
except (KeyError, AttributeError):
# This happens when 'read' receipt arrives before 'delivered', or after getting multiple copies of the same receipt message.
grp_memb_remaining = grp_memb_remain_un.delivered
try:
grp_memb_remaining.remove(grp_member)
except (KeyError, AttributeError):
return None
if not grp_memb_remain_un.delivered and grp_memb_remain_un.read:
return 'delivered'
if status == 'delivered':
remaining_unread = grp_memb_remain_un.read
remaining_unread.add(grp_member)
if grp_memb_remaining:
return None
return status
if any(grp_memb_remain_un):
return None
del self.grp_memb_remain_un
return status
def serialize(self):
ret = []
for attr in self.__slots__:
val = getattr(self, attr, None)
ret.append(val)
# Skip empty values at the end
for ind, val in enumerate(reversed(ret)):
if val:
if ind != 0:
ret = ret[:-ind]
break
else:
ret = []
return ret
def _make_markup_map(): # pylint: disable=no-method-argument
status_text = {
# Order matters: 'higher' status can't be 're-set' to a 'lower' one.
'': '<<',
'received_by_me': '>>',
'sending': '',
'send_failed': '',
'sent': '',
'delivered': '✓✓',
'read': '✓✓',
'ignore_receipts': '',
}
max_len = max((len(text) for text in status_text.values()))
markup_map = {}
for status, text in status_text.items():
markup_map[status] = (
('bold', text)
if status not in ('read', 'ignore_receipts')
else ('strikethrough', text)
)
return (markup_map, max_len)
MARKUP_MAP, MARKUP_WIDTH = _make_markup_map()
MAX_GROUP_SIZE = 15
def __init__(self):
self._status_map = {}
self._buffered = {}
self._status_order = {key: ind for ind, key in enumerate(self.MARKUP_MAP)}
self.on_status_changed = noop
def get_detailed(self, timestamp):
return self._status_map.get(timestamp, self.DetailedStatus())
def get_str(self, timestamp):
return self.get_detailed(timestamp).str
def on_receive_receipt(self, envelope):
receipt_contact = get_envelope_sender_id(envelope)
receipt_message = envelope['receiptMessage']
if receipt_message['isDelivery']:
status = 'delivered'
elif receipt_message['isRead']:
status = 'read'
elif receipt_message['isViewed']:
return
else:
logging.error('on_receive_receipt: unknown receipt type in envelope %s', envelope)
return
timestamps = receipt_message['timestamps']
when = receipt_message['when']
for timestamp in timestamps:
if timestamp not in self._status_map:
# Receipt is received before 'sent' status set (e.g. because receipt received before a `sync` message for a message sent from another device)
self._buffer_receipt(timestamp, status, receipt_contact)
else:
self._set(timestamp, status, when, receipt_contact)
def on_sending_message(self, envelope, group_members=None):
timestamp = get_envelope_time(envelope)
self._set(timestamp, 'sending')
if group_members is not None:
self._set_group_members(timestamp, group_members)
def on_sending_done(self, envelope, status='sent', timestamp_adj=None):
timestamp = get_envelope_time(envelope)
if timestamp not in self._status_map:
logging.error("DeliveryStatus: on_sending_done(): no corresponding timestamp in _status_map for envelope = %s", envelope)
return
self._set(timestamp, status)
if status == 'send_failed':
return
if timestamp_adj is not None:
self._adjust_timestamp(timestamp, timestamp_adj)
def _adjust_timestamp(self, timestamp_orig, timestamp_adj):
self._status_map[timestamp_adj] = self._status_map.pop(timestamp_orig)
def _set(self, timestamp, status, when=None, receipt_contact=None):
curr_status_detailed = self._status_map.setdefault(
timestamp, self.DetailedStatus()
)
curr_status = curr_status_detailed.str
if self._status_order[status] <= self._status_order[curr_status]:
return
is_group = getattr(curr_status_detailed, 'grp_memb_remain_un', False)
if is_group and receipt_contact is not None:
status = curr_status_detailed.set_grp_memb_status(receipt_contact, status)
if status is None:
return
logging.info("Setting status = `%s` for timestamp = %s", status, timestamp)
curr_status_detailed.str = status
if when is not None:
curr_status_detailed.when = when
self.on_status_changed(timestamp, status)
def _set_group_members(self, timestamp, group_members):
status_detailed = self._status_map[timestamp]
if len(group_members) > self.MAX_GROUP_SIZE:
self._set(timestamp, 'ignore_receipts')
return
status_detailed.grp_memb_remain_un = self.DelivReadConts(set(group_members), set())
def _buffer_receipt(self, timestamp, status, contact):
logging.debug("DeliveryStatus: buffering timestamp = %s", timestamp)
buffered = self._buffered.setdefault(
timestamp,
self.DelivReadConts(
set(), set()
)
)
buffered_contacts = getattr(buffered, status)
buffered_contacts.add(contact)
def process_buffered_receipts(self, timestamp):
buffered = self._buffered.get(timestamp)
if buffered is None:
return
logging.debug("Processing buffered receipts: timestamp = %s, self._buffered = %s", timestamp, self._buffered)
for status in buffered._fields:
buffered_contacts = getattr(buffered, status) or []
for contact in buffered_contacts:
self._set(timestamp, status, receipt_contact=contact)
del self._buffered[timestamp]
def delete(self, timestamp):
try:
del self._status_map[timestamp]
except KeyError:
pass
def dump(self):
ret = {}
for timestamp, status_detailed in self._status_map.items():
status_serialized = status_detailed.serialize()
if status_serialized:
ret[timestamp] = status_serialized
return ret
def load(self, status_map):
for timestamp, status_detailed in status_map.items():
self._status_map[int(timestamp)] = self.DetailedStatus(*status_detailed)
class TypingIndicators:
def __init__(self, chats):
self._chats = chats
self._map = {}
self.set_alarm_in = self.remove_alarm = noop
def on_typing_message(self, envelope):
sender_num = get_envelope_sender_id(envelope)
typing_event = get_nested(envelope, 'typingMessage', 'action')
self.remove(sender_num)
if typing_event == 'STARTED':
self._add(sender_num, envelope)
elif typing_event != 'STOPPED':
logging.error("on_typing_message: unknown `action` type in %s", envelope)
def _add(self, sender_num, envelope):
msg = self._chats.add_envelope(envelope)
alarm = self.set_alarm_in(10, lambda *_: self.remove(sender_num))
self._map[sender_num] = (msg, alarm)
def remove(self, sender_num):
try:
msg, alarm = self._map.pop(sender_num)
except KeyError:
return
self.remove_alarm(alarm)
try:
self._chats.delete_message(msg)
except ValueError:
logging.info("TypingIndicators: remove: index not found for envelope = %s", msg.envelope)
class ChatsData:
def __init__(self, history_file):
self.chats = Chats()
self.unread_counts = UnreadCounts()
self.delivery_status = DeliveryStatus()
self.typing_indicators = TypingIndicators(self.chats)
self._history = history_file
self.current_contact = None
self.contacts_cache = None
if self._history:
self._load_history()
atexit.register(self._save_history)
@property
def current_chat(self):
if self.current_contact:
return self.chats[self.current_contact.id]
return None
def _save_history(self):
envelopes = self.chats.serialize()
unread_counts = self.unread_counts.serialize()
delivery_status = self.delivery_status.dump()
items = {
'version': 5,
'envelopes': envelopes,
'unread_counts': unread_counts,
'delivery_status': delivery_status,
'contacts_cache': self.contacts_cache,
}
class JSONSetEncoder(json.JSONEncoder):
# Using a custom json encoder to encode `set`s from `DeliveryStatus` group_members.
def default(self, o):
try:
return json.JSONEncoder.default(self, o)
except TypeError:
if isinstance(o, set):
return tuple(o)
raise
with open(self._history, 'w', encoding="utf-8") as history_fileobj:
json.dump(items, history_fileobj, ensure_ascii=False, cls=JSONSetEncoder, indent=2)
def _load_history(self):
history_backup_filename = self._history + '.bak'
for history_filename in (self._history, history_backup_filename):
try:
with open(history_filename, 'r', encoding="utf-8") as history_fileobj:
history = json.load(history_fileobj)
except (FileNotFoundError, json.JSONDecodeError) as err:
if isinstance(err, json.JSONDecodeError):
logging.error("History file corrupted, attempting to read from backup.")
continue
else:
break
else:
logging.warning("Could not read history from file.")
return
os.replace(history_filename, history_backup_filename)
# If both `history` and `history.bak` are missing, the line above (amounting to `mv history.bak history.bak`) does not throw an error.
self.delivery_status.load(history.get('delivery_status', {}))
for envelope in history['envelopes']:
if get_envelope_reaction(envelope):
self.chats.add_reaction_envelope(envelope)
elif get_envelope_remote_delete(envelope):
self.chats.add_remote_delete_envelope(envelope)
else:
self.chats.add_envelope(envelope)
self.unread_counts = UnreadCounts(history.get('unread_counts', {}))
self.contacts_cache = history.get('contacts_cache', {})
# #############################################################################
# urwid palette
# #############################################################################
PALETTE = [
('bold', 'bold', ''),
('italic', 'italics', ''),
('bolditalic', 'bold,italics', ''),
('strikethrough', 'strikethrough', ''),
]
REVERSED_FOCUS_MAP = {
None: 'reversed',
}
def _fill_palette():
palette_reversed = []
for item in PALETTE:
name, fg = item[0:2]
name_rev = '_'.join(('reversed', name))
fg_rev = ','.join(('standout', fg))
palette_reversed.append((name_rev, fg_rev, ''))
REVERSED_FOCUS_MAP[name] = name_rev
PALETTE.extend(palette_reversed)
PALETTE.append(('reversed', 'standout', ''))
PALETTE.append(('line_focused', 'dark blue', ''))
_fill_palette()
class Color:
SIGNAL_COLORS_PALETTE = [
('pink', 'dark magenta', '', None, '#f08', None),
('red', 'dark red', '', None, '#f00', None),
('orange', 'brown', '', None, '#f60', None),
('purple', 'dark magenta', '', None, '#a0f', None),
('indigo', 'dark blue', '', None, '#60f', None),
('blue_grey', 'brown', '', None, '#680', None),
('ultramarine', 'dark blue', '', None, '#06f', None),
('blue', 'dark cyan', '', None, '#06a', None),
('teal', 'dark cyan', '', None, '#086', None),
('green', 'dark green', '', None, '#0a0', None),
('light_green', 'dark green', '', None, '#0d0', None),
('brown', 'brown', '', None, '#880', None),
('grey', 'light gray', '', None, 'g52', None),
]
# The colors are defined in ..?
# Signal-Android/app/src/main/java/org/thoughtcrime/securesms/contacts/avatars/ContactColorsLegacy.java
# Signal-Android/app/src/main/res/values/material_colors.xml
# Using `dark ...` colors, because many terminals show `light ...` as `bold`:
# "Some terminals also will display bright colors in a bold font even if you dont specify bold."
# https://urwid.readthedocs.io/en/latest/manual/displayattributes.html#bold-underline-standout
HIGH_COLOR_RE = re.compile(r"""
\#[0-9A-Fa-f]{3}
|
g\#[0-9A-Fa-f]{2}
|
g[0-9]{1,3}
|
h[0-9]{1,3}
""", re.VERBOSE)
# https://urwid.readthedocs.io/en/latest/reference/attrspec.html#urwid.AttrSpec
def __init__(self, args_color):
self._args_color = args_color
self.high_color_mode = False
self._colors = self._set_color_palette()
def _exit(self):
sys.exit("ERROR: could not parse the `color` argument: " + repr(self._args_color))
def _is_high_color(self, color_str):
# Test if `color_str` is a "high-color" (256 colors) value
return self.HIGH_COLOR_RE.fullmatch(color_str)
def _add_palette_entry(self, name, val):
if self._is_high_color(val):
PALETTE.append((name, '', '', None, val, None))
self.high_color_mode = True
else:
PALETTE.append((name, val, ''))
def _set_color_palette(self):
if self._args_color == 'high':
self.high_color_mode = True
if self._args_color is True or self._args_color == 'high':
PALETTE.extend(self.SIGNAL_COLORS_PALETTE)
return self._args_color
try:
color_spec = json.loads(self._args_color)
except (TypeError, json.decoder.JSONDecodeError):
self._exit()
if isinstance(color_spec, list) and len(color_spec) == 2:
for sent_or_recv, col in zip(
('sent_color', 'recv_color'),
color_spec,
):
self._add_palette_entry(sent_or_recv, col)
return color_spec
elif isinstance(color_spec, dict):
PALETTE.extend(self.SIGNAL_COLORS_PALETTE)
# Adding a tuple to PALETTE that already has a tuple with the same "name" (i.e. the first item in tuple) overrides the old tuple.
override_dict = {}
for key, val in color_spec.items():
self._add_palette_entry(key, val)
if is_number(key):
override_dict[key] = key # sic
return override_dict
else:
return self._exit() # `return` just to make pylint happy
def for_message(self, msg):
try:
return self._colors[msg.sender_num]
except (TypeError, KeyError):
pass
if isinstance(self._colors, list):
if is_envelope_outgoing(msg.envelope):
return 'sent_color'
else:
return 'recv_color'
if is_envelope_outgoing(msg.envelope):
return 'default'
try:
return msg.sender.color
except (TypeError, AttributeError):
# In case `sender` is not in `Contacts`
return 'default'
# #############################################################################
# ui utility
# #############################################################################
def markup_to_text(markup):
# This is useful when we have only the markup; if we have the urwid.Text instance, can use its `.text` property instead.
# Not currently used anywhere.
if isinstance(markup, str):
return markup
elif isinstance(markup, tuple):
return markup[1]
else:
return ''.join([markup_to_text(t) for t in markup])
def get_text_markup(text_widget):
"""Get urwid.Text widget text, in markup format.
Like urwid.Text.get_text(), but returns a text markup that can be passed on to urwid.Text.set_text() or to urwid.Text() for creating a new text object"""
text, display_attributes = text_widget.get_text()
if not display_attributes:
return text
markup = []
run_len_pos = 0
for attr, attr_run_len in display_attributes:
attr_run_end = run_len_pos + attr_run_len
markup.append((attr, text[run_len_pos:attr_run_end]))
run_len_pos = attr_run_end
if run_len_pos != len(text):
markup.append(text[run_len_pos:])
return markup
def listbox_set_body(listbox, body_new):
# Can't just do `listbox.body = body_new`:
# https://github.com/urwid/urwid/issues/428
# pylint: disable=protected-access
if body_new is listbox.body:
return
urwid.disconnect_signal(listbox.body, "modified", listbox._invalidate)
listbox.body = body_new
urwid.connect_signal(listbox.body, "modified", listbox._invalidate)
class LineBoxHighlight(urwid.WidgetWrap):
def __init__(self, w, title=''):
box_w = urwid.AttrMap(
urwid.LineBox(
urwid.AttrMap(w, ''), # need to set a "default" attribute, to not color all the contents in `w`
title_align='center',
title=title
),
None,
focus_map='line_focused',
)
super().__init__(box_w)
class PopUpBox(urwid.WidgetWrap):
signals = ['closed']
def __init__(self, widget, title='', buttons=True, shadow_len=2):
self._buttons = buttons
try:
urwid.connect_signal(widget, 'closed', self._emit, user_args=['closed'])
except NameError:
pass
if buttons:
def handle_click(_button):
self._emit('closed')
btn_close = urwid.Padding(urwid.Button('Close', on_press=handle_click), align='center', width=9)
self._frame_w = urwid.Frame(widget, footer=btn_close, focus_part='footer')
box_w = urwid.LineBox(self._frame_w, title)
else:
box_w = urwid.LineBox(widget)
if shadow_len:
### Shadow effect. (Based on urwid/examples/dialog.py)
box_w = urwid.Columns([
box_w,
('fixed', shadow_len, urwid.AttrWrap(
urwid.Filler(
urwid.Text(('default', ' '*shadow_len)),
"top"
),
'reversed'))
])
box_w = urwid.Frame(
box_w,
footer = urwid.AttrMap(
urwid.Padding(
urwid.Text(('default', ' '*shadow_len)),
align='left',
),
'reversed')
)
super().__init__(box_w)
def keypress(self, size, key):
key = super().keypress(size, key)
if key in ('esc', 'q'):
self._emit('closed')
elif key in ('tab', 'shift tab'):
if self._buttons:
if self._frame_w.focus_position == 'footer':
self._frame_w.focus_position = 'body'
else:
self._frame_w.focus_position = 'footer'
else:
return key
return None
class FocusableText(urwid.WidgetWrap):
def __init__(self, markup, attr_map=None, **kwargs):
self._text_w = urwid.Text(markup, **kwargs)
w = urwid.AttrMap(self._text_w, attr_map, focus_map=REVERSED_FOCUS_MAP)
super().__init__(w)
def selectable(self):
# Setting class variable `_selectable = True` does not work. Probably gets overwritten by the base class constructor.
return True
def keypress(self, _size, key):
# When reimplementing selectable(), have to redefine keypress() too.
# https://urwid.readthedocs.io/en/latest/reference/widget.html#urwid.Widget.selectable
return key
def __getattr__(self, attr):
return getattr(self._text_w, attr)
class LazyEvalListWalker(urwid.ListWalker):
"""A ListWalker that creates widgets only as they come into view.
This ListWalker subclass saves resources by deferring widgets creation until they are actually visible. For large `contents` list, most of the items might not be viewed in a typical usage.
"If you need to display a large number of widgets you should implement your own list walker that manages creating widgets as they are requested and destroying them later to avoid excessive memory use."
https://urwid.readthedocs.io/en/latest/manual/widgets.html#list-walkers
"""
def __init__(self, contents, eval_func, init_focus_pos=0):
if not getattr(contents, '__getitem__', None):
raise urwid.ListWalkerError("ListWalker expecting list like object, got: %r" % (contents,))
self._init_focus_pos = init_focus_pos
self._eval_func = eval_func
self.contents = contents
super().__init__() # Not really needed, just here to make pylint happy.
@property
def contents(self):
return self._contents
@contents.setter
def contents(self, contents_new):
self._remove_contents_modified_callback()
self._contents = contents_new
self._set_contents_modified_callback(self._modified)
if self._init_focus_pos < 0:
self.focus = max(0, len(self.contents) + self._init_focus_pos)
else:
self.focus = self._init_focus_pos
self._modified()
def _set_contents_modified_callback(self, callback):
try:
self.contents.set_modified_callback(callback)
except AttributeError:
logging.warning(
"Changes to object will not be automatically updated: %s",
textwrap.shorten(str(self.contents), 150),
)
def _remove_contents_modified_callback(self):
try:
self.contents.set_modified_callback(noop)
except AttributeError:
pass
def _modified(self):
if self.focus >= len(self.contents):
# Making sure that if after some items are removed from `contents` it becomes shorter then the current `focus` position, we don't crash.
self.focus = max(0, len(self.contents) - 1)
super()._modified()
def __getitem__(self, position):
item = self.contents[position]
widget = self._eval_func(item, position)
return widget
def next_position(self, position):
if position >= len(self.contents) - 1:
raise IndexError
return position + 1
def prev_position(self, position):
if position <= 0:
raise IndexError
return position - 1
def set_focus(self, position):
if position < 0 or position >= len(self.contents):
raise IndexError
self.focus = position
self._modified()
def positions(self, reverse=False):
ret = range(len(self.contents))
if reverse:
ret = reversed(ret)
return ret
class ViBindingsMixin(urwid.Widget):
_KEY_MAP = {
'h': 'left',
'j': 'down',
'k': 'up',
'l': 'right',
'g': 'home',
'G': 'end',
'ctrl p': 'up',
'ctrl n': 'down',
}
def keypress(self, size, key):
key = super().keypress(size, key) # sibling class's keypress() will be called ("cooperative super")
key_equiv = self._KEY_MAP.get(key)
if key_equiv:
return super().keypress(size, key_equiv)
return key
class ListBoxPlus(ViBindingsMixin, urwid.ListBox):
"""ListBox plus a few useful features.
- Vim bindings for common motions: j, k, g, G, ctrl+n/p.
- Filter visible contents to the items passing test by a given function.
- Updates to new `contents` are displayed automatically. Fixes an urwid bug (see listbox_set_body function).
"""
def __init__(self, body=None):
if body is None:
body = []
super().__init__(body)
self._contents_pre_filter = self.contents
def _get_contents(self):
try:
return self.body.contents
except AttributeError:
return self.body
def _set_contents(self, contents_new):
# This method does not change the self._contents_pre_filter, unlike self._set_contents_pre_filter()
try:
self.body.contents = contents_new
except AttributeError:
listbox_set_body(self, contents_new)
def _set_contents_pre_filter(self, contents_new):
if type(contents_new) is list: # pylint: disable=unidiomatic-typecheck
# If contents_new is a `list` (not one of the `ListWalker`s), make the new body the same type as the original (e.g. SimpleListWalker)
# Shouldn't use `if isinstance(contents_new, list)` test: a ListWalker returns `True` for it too.
contents_new = type(self.contents)(contents_new)
self._set_contents(contents_new)
self._contents_pre_filter = self.contents
contents = property(_get_contents, _set_contents_pre_filter)
# Would be nice to override the base class's `body` property, so that this class can be easily replaced by any other `ListWalker`s.
# However, overriding a property which is used in superclass's __init__ seems problematic. Need a way to delay the assignment of property. Maybe something like this is necessary:
# https://code.activestate.com/recipes/408713-late-binding-properties-allowing-subclasses-to-ove/
def try_set_focus(self, index, valign=None):
index_orig_arg = index
if index < 0:
index = len(self.contents) + index
try:
self.focus_position = index
except IndexError:
return
if index_orig_arg == -1 and valign is None:
valign = 'bottom'
if valign is not None:
self.set_focus_valign(valign)
def filter_contents(self, test_function, scope=None):
"""Remove widgets not passing `test_function`.
Retain only the items in `self.contents` that return `True` when passed as arguments to `test_function`. Pre-filtered `contents` is stored before filtering and can be restored by running `filter_contents` again with `test_function=None`.
The `scope` argument specifies the itarable to apply the filter to. By default, the scope is all the pre-filtered items. Passing `scope=self.contents' can be useful to further filter an already filtered contents.
"""
# Note that if `contents` is modified directly elsewhere in the code while a filter is on, this modification applies only to the filtered contents. So, for instance the code for adding a new MessageWidget to ChatView shouldn't do `self.contents.append()`, but rather `current_chat.append()` (after doing `_set_contents_pre_filter(current_chat)`). That way the new msg will show up after the filter is removed.
# Alternatively, can do `self._contents_pre_filter.append()`. That should work fine either with filter on or off.
if scope is None:
scope = self._contents_pre_filter
if test_function is None:
self._set_contents(scope)
else:
contents_type = type(self.contents)
matching_widgets = contents_type([w for w in scope if test_function(w)])
self._set_contents(matching_widgets)
@property
def is_filter_on(self):
return self.contents is not self._contents_pre_filter
def move_item(self, w, pos, pos_in_prefilter=None):
def try_move(seq, w, pos):
try:
ind = seq.index(w)
except ValueError:
# Widget might be absent from `body` e.g. while doing a search on contacts, or if the contact is 'new' (i.e. not in Contacts yet)
return
if ind == pos:
return
seq.insert(pos, seq.pop(ind))
try_move(self.contents, w, pos)
if self.is_filter_on:
if pos_in_prefilter is None:
pos_in_prefilter = pos
try_move(self._contents_pre_filter, w, pos_in_prefilter)
# #############################################################################
# contacts widgets
# #############################################################################
class ContactWidget(FocusableText):
SEND_FAILED_MARKUP = ''
NOTE_TO_SELF_MARKUP = ('italic', ' (Self)')
GROUP_MARKUP = ('italic', ' [GRP]')
HIGHLIGHT_MARKUP_ATTR = 'bold'
def __init__(self, contact):
self.contact = contact
self._fail_mark_set = False
self._highlight = False
self._unread_count = 0
self._name_markup = self._get_name_markup()
super().__init__(self._name_markup)
def _get_name_markup(self):
markup = []
name = self.contact.name_or_id
markup.append(name)
if self.contact.is_group and not cfg.partition_contacts:
markup.append(self.GROUP_MARKUP)
elif self.contact.id == cfg.username:
markup.append(self.NOTE_TO_SELF_MARKUP)
return markup
def _update_markup(self):
markup = []
if self._fail_mark_set:
markup.extend([self.SEND_FAILED_MARKUP, " "])
if self._unread_count:
markup.extend([('bold', f"({self._unread_count})"), " "])
# Moving the " " into the ('bold', ..) element removes the italic in [GRP] when contact selected and unread count shown.
if self._highlight:
markup.append((self.HIGHLIGHT_MARKUP_ATTR, self._name_markup))
else:
markup.extend(self._name_markup)
self.set_text(markup)
@property
def unread_count(self):
return self._unread_count
@unread_count.setter
def unread_count(self, count):
if count == self._unread_count:
return
self._unread_count = count
self._update_markup()
@property
def fail_mark_set(self):
return self._fail_mark_set
@fail_mark_set.setter
def fail_mark_set(self, true_false):
if self._fail_mark_set == true_false:
return
self._fail_mark_set = true_false
self._update_markup()
@property
def highlight(self):
return self._highlight
@highlight.setter
def highlight(self, new_val):
if self._highlight == new_val:
return
self._highlight = new_val
self._update_markup()
class PartitionedContactsListWalker(urwid.SimpleListWalker):
"""Ensure that when `partition_contacts == True` only the ContactWidget objects can be in focus (not the headers or divider widgets).
If there are no ContactWidget objects it will focus on the last widget in `self.contents`.
"""
def set_focus(self, position):
# Overriding the base class's function to make sure only ContactWidget type objects may be in focus.
# When the widget at `position` is not a ContactWidget, try the ones below it until we find one or reach the end.
for pos in range(position, len(self)):
w = self[pos]
if type(w) is ContactWidget: # pylint: disable=unidiomatic-typecheck
# Check that widget is of exactly ContactWidget type, not one of its base classes.
return super().set_focus(pos)
return None
def set_modified_callback(self, callback):
# Abstract method, inherited from urwid.MonitoredList; has to be overriden in the concrete class.
# See base class's docs: urwid.SimpleListWalker.set_modified_callback
raise NotImplementedError(
'Use connect_signal(list_walker, "modified", ...) instead.'
)
class ContactsListWidget(ListBoxPlus):
signals = ['contact_selected']
def __init__(self, contacts, chats_data):
super().__init__(
urwid.SimpleListWalker([])
if not cfg.partition_contacts else
PartitionedContactsListWalker([])
)
self._contacts = contacts
self._chats_data = chats_data
self._contact_widgets_map = {}
self.update()
def _get_sorted_contacts(self):
def sorter(contact):
contact_name = contact.name_or_id
if cfg.contacts_sort_alpha:
return contact_name.casefold()
try:
chat = self._chats_data.chats[contact.id]
last_msg = chat[-1]
except (KeyError, IndexError):
return (0, contact_name.casefold())
return (-last_msg.local_timestamp, contact_name.casefold())
if not cfg.partition_contacts:
return sorted(self._contacts.map.values(), key=sorter)
else:
grps = sorted(self._contacts.groups, key=sorter)
cnts = sorted(self._contacts.indivs, key=sorter)
return (grps, cnts)
def update(self):
sorted_contacts = self._get_sorted_contacts()
if not cfg.partition_contacts:
self.contents = [ContactWidget(contact) for contact in sorted_contacts]
self._contact_widgets_map = {w.contact.id: w for w in self.contents}
else:
group_contact_widgets = [ContactWidget(contact) for contact in sorted_contacts[0]]
indiv_contact_widgets = [ContactWidget(contact) for contact in sorted_contacts[1]]
div_w = urwid.Divider('-')
group_cont_section_title = urwid.Text(('bold', '~~ Groups ~~'), align='center')
indiv_cont_section_title = urwid.Text(('bold', '~~ Contacts ~~'), align='center')
widgets = (
[group_cont_section_title, div_w]
+ group_contact_widgets
+ [div_w, indiv_cont_section_title, div_w]
+ indiv_contact_widgets
)
self._indiv_header_w = indiv_cont_section_title # Used in _move_contact_top() for getting its index position
self.contents = widgets
self._contact_widgets_map = {w.contact.id: w for w in group_contact_widgets + indiv_contact_widgets}
self._set_all_ws_unread_counts()
try:
self._get_current_contact_widget().highlight = True
except AttributeError: # current_contact is None
pass
self.try_set_focus(0)
def _set_all_ws_unread_counts(self):
for contact_id, contact_widget in self._contact_widgets_map.items():
unread_count = self._chats_data.unread_counts.get(contact_id, 0)
if unread_count:
contact_widget.unread_count = unread_count
def update_contact_unread_count(self, contact_id):
contact_widget = self._contact_widgets_map.get(contact_id)
if contact_widget is not None:
# The widget is None if received a msg from a 'new' contact (one not in the read signal-cli's data file)
contact_widget.unread_count = self._chats_data.unread_counts[contact_id]
def on_new_message(self, msg):
contact_widget = self._contact_widgets_map.get(msg.contact_id)
if not cfg.contacts_sort_alpha and contact_widget is not None:
self._move_contact_top(contact_widget)
def on_sending_done(self, envelope, status='sent', _timestamp_adj=None):
# Show a "send failed" symbol next to the contact, but not if it's the "current" contact (whose chat is opened).
if status != 'send_failed':
return
current_contact = self._chats_data.current_contact
if current_contact is None:
# If contacts' update happens while sending, and current_contact no longer in contacts.
return
envelope_contact_id = get_envelope_contact_id(envelope)
if current_contact.id == envelope_contact_id:
return
contact_widget = self._contact_widgets_map[envelope_contact_id]
contact_widget.fail_mark_set = True
def _move_contact_top(self, w):
pos_in_prefilter = None
if not cfg.partition_contacts:
pos_new = 0
else:
if w.contact.is_group:
pos_new = 2
elif not self.is_filter_on:
pos_new = len(self._contacts.groups) + 5 # 2 for "Groups" header and 3 for "Contacts"
else:
pos_new = self.contents.index(self._indiv_header_w) + 2
pos_in_prefilter = len(self._contacts.groups) + 5
self.move_item(w, pos_new, pos_in_prefilter)
self.try_set_focus(pos_new)
def _get_current_contact_widget(self):
current_contact = self._chats_data.current_contact
if current_contact is None:
return None
return self._contact_widgets_map[current_contact.id]
def _get_focused_contact_widget(self):
focused_contact_w = self.focus
# NOTE: self.focus can be None e.g. when searching through contacts returns no results.
if cfg.partition_contacts and not isinstance(focused_contact_w, ContactWidget):
# Widget in focus is urwid.Text (a header) or urwid.Divider. They are normally not supposed to get the focus, but sometimes may: e.g. after pressing `home`, or after doing a search with `/`, or when there are no other widgets (e.g. no search results).
return None
return focused_contact_w
def _unhighlight_current_contact_widget(self):
# Remove highlighting from the "current" (not for long) contact's widget.
try:
self._get_current_contact_widget().highlight = False
except AttributeError: # current_contact is None
pass
def _select_focused_contact(self, focus_widget=None):
# The `focus_widget` parameter is passed through from caller to emit_signal. It specifies whether the focus should be set on `input`, `chat` or `contacts` widgets after switching to a new contact.
focused_contact_w = self._get_focused_contact_widget()
if focused_contact_w is None:
return
contact = focused_contact_w.contact
focused_contact_w.fail_mark_set = False
self._unhighlight_current_contact_widget()
urwid.emit_signal(self, 'contact_selected', contact, focus_widget)
focused_contact_w.highlight = True
def select_next_contact(self, reverse=False):
current_contact = self._chats_data.current_contact
if current_contact == self.focus.contact or current_contact is None:
curr_position = self.focus_position
else:
contact_w = self._contact_widgets_map[current_contact.id]
curr_position = self.contents.index(contact_w)
try:
focus_position_new = (
self.body.next_position(curr_position)
if not reverse else
self.body.prev_position(curr_position)
)
except IndexError:
return
#focus_position_new = self.focus_position - int((reverse - 0.5) * 2) # Alternative way of obtaining the new position
if (cfg.partition_contacts
and reverse
and not isinstance(self.contents[focus_position_new], ContactWidget)
and focus_position_new != 1):
# Jumping over the `~~ Contacts ~~` header when going up.
focus_position_new -= 3
try:
self.set_focus(focus_position_new, coming_from='below' if reverse else 'above')
except IndexError:
return
self._select_focused_contact()
def _increment_focused_unread_count(self):
# NOTE: Does not increment unread count in the status line. However it will be updated after switching to another contact.
focused_contact_w = self._get_focused_contact_widget()
if focused_contact_w is None:
return
contact_id = focused_contact_w.contact.id
self._chats_data.unread_counts[contact_id] += 1
self.update_contact_unread_count(contact_id)
def keypress(self, size, key):
key = super().keypress(size, key)
if key == 'enter':
self._select_focused_contact(focus_widget='input')
elif key == 'right':
self._select_focused_contact()
elif key == 'U':
self._increment_focused_unread_count()
else:
return key
return None
class ContactsWindow(urwid.Frame):
def __init__(self, contacts, chats_data):
self.contacts_list_w = ContactsListWidget(contacts, chats_data)
self._wsearch = BracketedPasteEdit(('bold', '/ '))
urwid.connect_signal(self._wsearch, 'postchange', self._on_search_text_changed)
super().__init__(self.contacts_list_w, footer=None)
if not cfg.partition_contacts:
self.header = urwid.Pile([
urwid.Text(('bold', 'Contacts'), align='center'),
urwid.Divider('-')
])
def _start_search(self):
self.footer = self._wsearch
self.focus_position = 'footer'
def _remove_search(self):
self._wsearch.set_edit_text('')
self.focus_position = 'body'
self.footer = None
def _on_search_text_changed(self, input_w, _old_text):
def match_test(contact_w):
try:
contact = contact_w.contact
except AttributeError:
# Keep the `partition_contacts` headers / dividers
return True
return (
txt.casefold() in contact.name_or_id.casefold()
or
not contact.is_group and txt in contact.id
)
txt = input_w.get_edit_text()
match_test = None if not txt else match_test
self.contacts_list_w.filter_contents(match_test)
def keypress(self, size, key):
key = super().keypress(size, key)
if key == '/':
self._start_search()
elif key == 'enter' and self.focus_position == 'footer':
self.focus_position = 'body'
self.contacts_list_w.try_set_focus(0)
elif key == 'esc':
self._remove_search()
else:
return key
return None
# #############################################################################
# input line
# #############################################################################
class CommandsHistory:
def __init__(self):
self._history = []
self._index = 0
self._stashed_input = None
def prev(self, curr_input):
if (curr_input != self._stashed_input
and self._history
and curr_input != self._history[self._index]):
# This check fixes the following unexpected behavior:
# Type `:whatev`, press `up` a few times, then delete the input with e.g. `backspace`. Next time the history will be looked up from where it's been left this time.
self._index = 0
if self._index == 0:
self._stashed_input = curr_input
self._index -= 1
try:
return self._history[self._index]
except IndexError:
self._index += 1
return curr_input
def next(self, curr_input):
if self._index == 0:
return curr_input
self._index += 1
if self._index == 0:
return self._stashed_input
return self._history[self._index]
def add(self, cmd):
self._history.append(cmd)
self._index = 0
class BracketedPasteEdit(Edit):
def __init__(self, *args, multiline=False, **kwargs):
super().__init__(*args, multiline=True, **kwargs)
# Using `multiline=True` in super() and then passing on 'enter' keypress to it. A nicer alternative would be to pass '\n', but Edit does not handle it.
self._multiline_arg = multiline
self._paste_mode_on = False
def keypress(self, size, key):
if key == 'begin paste':
self._paste_mode_on = True
elif key == 'end paste':
self._paste_mode_on = False
elif key == 'enter' and not (self._multiline_arg or self._paste_mode_on):
return key
elif key == 'meta enter':
# Allow inserting new lines with Alt+Enter. This is not a part of "bracketed paste mode" functionality.
return super().keypress(size, 'enter')
else:
return super().keypress(size, key)
return None
class InputLine(BracketedPasteEdit):
def __init__(self, **kwargs):
self._cmd_history = CommandsHistory()
self._cmds = None
self._prompt = ('bold', '> ') # In urwid's parlance, this is called 'caption'.
super().__init__(self._prompt, **kwargs)
def set_cmds(self, cmds):
self._cmds = cmds
def _set_edit_text_move_cursor(self, txt, cursor_pos=-1):
"""Edit.set_edit_text() + Edit.set_edit_pos()
Like Edit.insert_text(), but istead of adding to the current edit_text, replace it with the provided argument.
"""
self.set_edit_text(txt)
if cursor_pos == -1:
cursor_pos = len(txt)
self.set_edit_pos(cursor_pos)
def auto_complete_commands(self, txt):
# See also: there is an autocomplete in rr-/urwid_readline
splitted_txt = txt.split(' ')
if len(splitted_txt) > 1:
path, *messages = split_path(' '.join(splitted_txt[1:]))
# Check we are trying to complete a path
if len(messages) > 0 or not is_path(path):
return
fullpath = os.path.expanduser(path)
dirname = os.path.dirname(fullpath)
if not os.path.isdir(dirname):
return
possible_paths = [x for x in os.listdir(dirname) if os.path.join(dirname, x).startswith(fullpath)]
commonprefix = os.path.commonprefix(possible_paths)
action_request.set_status_line(
textwrap.shorten(
' | '.join(sorted(possible_paths)),
width=240,
))
completion = ''
if commonprefix != '':
completion = os.path.join(os.path.dirname(path), commonprefix)
if os.path.isdir(os.path.expanduser(completion)) and not completion.endswith('/'):
completion = completion + '/'
if ' ' in completion:
completion = '"' + completion + '"'
if completion != '':
self._set_edit_text_move_cursor(splitted_txt[0] + ' ' + completion)
else:
all_commands = [
cmd
for cmd in [tupl[0][0] for tupl in self._cmds.cmd_mapping]
if cmd.lower().startswith(txt[1:].lower())
]
commonprefix = os.path.commonprefix(all_commands)
action_request.set_status_line('{' + ' | '.join(sorted(all_commands)) + '}')
if len(all_commands) == 1:
self._set_edit_text_move_cursor(':' + all_commands[0] + ' ')
elif commonprefix != '':
self._set_edit_text_move_cursor(':' + commonprefix)
def _keypress_cmd_mode(self, key, key_orig, txt):
# Called when `txt.startswith(':')`
if key == 'enter':
if txt.strip() == ":":
action_request.set_status_line('Command missing after `:`')
return None
cmd, *args = txt[1:].split(maxsplit=1)
self._cmds.exec(cmd, *args)
self._cmd_history.add(txt)
self.set_edit_text('')
self.set_caption(self._prompt)
elif key == 'tab' and not self.get_edit_text().endswith(' '):
self.auto_complete_commands(txt)
elif key_orig in ('up', 'ctrl p'):
# Since BracketedPasteEdit is based on Edit(multiline=True), the up / down / ctrl+p/n are consumed by the superclass, so need to check `key_orig`, before `super` method call.
prev_cmd = self._cmd_history.prev(txt)
self._set_edit_text_move_cursor(prev_cmd)
elif key_orig in ('down', 'ctrl n'):
next_cmd = self._cmd_history.next(txt)
self._set_edit_text_move_cursor(next_cmd)
else:
return key
return None
def keypress(self, size, key):
key_orig = key
key = super().keypress(size, key)
txt = self.get_edit_text()
if not txt or txt.isspace():
self.set_caption(self._prompt) # restore normal prompt
return key
if txt.startswith(('/', ':')):
self.set_caption('') # set "prompt" to '/' or ':'
if key == 'esc':
self.set_edit_text('')
self.set_caption(self._prompt)
return None
else:
self.set_caption(self._prompt)
# Bind readline equivalents
if key == 'ctrl left':
return super().keypress(size, 'meta b')
if key == 'ctrl right':
return super().keypress(size, 'meta f')
#if key == 'ctrl backspace':
# uwrid registers 'ctrl backspace' as just 'backspace'.. Use 'ctrl w' or 'meta backspace' instead.
#return super().keypress(size, 'ctrl w')
# /end: Bind readline equivalents
if txt.startswith(':'):
return self._keypress_cmd_mode(key, key_orig, txt)
elif key == 'enter':
if txt.startswith('/'):
return key
action_request.send_message_curr_contact(txt)
self.set_edit_text('')
else:
return key
return None
# #############################################################################
# conversation widgets
# #############################################################################
class MessageReactionsWidget(urwid.WidgetWrap):
def __init__(self, emojis_markup, align):
self._align = align
method = self._init_row_w if not cfg.show_inline else self._init_col_w
self._text_w, display_w = method(emojis_markup)
super().__init__(display_w)
def _init_row_w(self, emojis_markup):
text_w = urwid.Text(emojis_markup, align=self._align)
react_pad_w = urwid.Padding(text_w, self._align, width=cfg.wrap_at)
react_sym_markup = '╰╴' if self._align == 'left' else '╶╯'
react_sym_w = urwid.Text(
react_sym_markup,
align='right' if self._align == 'left' else 'left',
)
cols = [
(DeliveryStatus.MARKUP_WIDTH + len(react_sym_markup), react_sym_w),
react_pad_w,
]
if self._align == 'right':
cols.reverse()
react_cols_w = urwid.Columns(cols)
return text_w, react_cols_w
def _init_col_w(self, emojis_markup):
text_w = urwid.Text(self._col_w_markup(emojis_markup))
return text_w, text_w
def _col_w_markup(self, emojis_markup):
return (
[*emojis_markup, '']
if self._align == 'right' else
['', *emojis_markup]
)
def update(self, emojis_markup):
if not cfg.show_inline:
self._text_w.set_text(emojis_markup)
else:
self._text_w.set_text(self._col_w_markup(emojis_markup))
class MessageWidget(urwid.WidgetWrap):
MAX_ATTACHS_SHOW = 4
TYPING_INDICATOR_MARKUP = '...'
REMOTE_DELETE_MARKUP = ('italic', '[deleted]')
STICKER_MARKUP = ('italic', '[sticker]')
FORMAT_MAP = {'_': 'italic', '*': 'bold', '~': 'strikethrough'}
MENTION_BRACKET_CHAR = chr(31) # arbitrary non-printable char
FORMAT_MENTION = {MENTION_BRACKET_CHAR: 'italic'}
FORMATTING_RE = None
MARKUP_ELS_SEPARATOR = '\n'
SENDER_NAME_COL_WIDTH = 12
MSG_MARKUP_PRE = ()
@classmethod
def set_formatting_consants(cls, use_formatting):
if use_formatting:
cls.FORMAT_MAP.update(cls.FORMAT_MENTION)
else:
cls.FORMAT_MAP = cls.FORMAT_MENTION
cls.FORMATTING_RE = re.compile(
# Match text like "_italicised_", where "_" is a char in FORMAT_MAP
rf"""
(
[{''.join(cls.FORMAT_MAP.keys())}]
)
#.+? # bad with doubled format chars, e.g. ~~this~~
#[^\1]+ # can't use backreferences in character class
(?:
(?!\1). # consume a char and check it's not a format char
)+
\1
""",
re.VERBOSE)
if cfg.show_inline == 'columns':
if not cfg.one_sided:
cls.SENDER_NAME_COL_WIDTH = 'pack'
else:
cls._get_inline_columns = lambda self: []
cls.MSG_MARKUP_PRE = (
cls._get_time_markup,
cls._get_sender_markup,
)
if cfg.show_inline == 'wrap':
cls.MARKUP_ELS_SEPARATOR = ' '
def __init__(self, msg):
self.msg = msg
self.align = (
'left'
if (not is_envelope_outgoing(self.msg.envelope)
or cfg.one_sided)
else 'right'
)
msg_markup = self._get_message_markup()
self._text_w = FocusableText(msg_markup or '', align=self.align)
# urwid.Text throws an error if given an empty list for markup. Not sure `msg_markup` can ever end up being empty though.
msg_pad_w = urwid.Padding(self._text_w, self.align, width=cfg.wrap_at)
status_markup = self._get_status_markup()
self._status_w = urwid.Text(status_markup, self.align)
status_w_valign = 'top' if self.align == 'left' else 'bottom'
status_filler_w = urwid.Filler(self._status_w, status_w_valign)
cols = [(DeliveryStatus.MARKUP_WIDTH, status_filler_w), *self._get_inline_columns(), msg_pad_w]
box_columns = [0]
if self.align == 'right':
cols.reverse()
box_columns = [len(cols)-1]
self._columns_w = urwid.Columns(cols, dividechars=1, box_columns=box_columns)
self._color = None if not cfg.color else cfg.color.for_message(msg)
display_w = urwid.AttrMap(self._columns_w, self._color, focus_map=REVERSED_FOCUS_MAP)
super().__init__(display_w)
self._reactions_w = None
self.update_reactions_w()
if cfg.show_message_padding:
self._add_pile_row(urwid.Divider(cfg.show_message_padding))
def _get_inline_columns(self):
return [
(width,
urwid.Text(markup)
if width == 'pack' else
urwid.Padding(
urwid.Text(markup, wrap='clip'),
align='right',
width='pack',
)
)
for width, markup in (
('pack', self._get_time_markup()),
(self.SENDER_NAME_COL_WIDTH, self._get_sender_markup()),
) if markup is not None
]
def _get_message_markup(self):
markups = []
if self.MSG_MARKUP_PRE:
# Show sender's name and message timesamp on the same line if both --show-names --show-message-time are true.
markups_list = [f(self) for f in self.MSG_MARKUP_PRE]
for elm in markups_list if self.align == 'left' else reversed(markups_list):
if elm:
markups.extend((*elm, ' '))
markups = [markups]
if 'typingMessage' in self.msg.envelope:
markups.append(
[self.TYPING_INDICATOR_MARKUP],
)
elif self.msg.envelope.get('callMessage') is not None:
markups.append(
self._get_call_message_markup(),
)
elif getattr(self.msg, 'remote_delete', None):
markups.append(
[self.REMOTE_DELETE_MARKUP],
)
elif get_envelope_sticker(self.msg.envelope) is not None:
markups.append(
[self.STICKER_MARKUP],
)
else:
markups.extend([
self._get_quote_markup(),
self._get_text_markup(),
self._get_attachments_markup(),
])
ret = []
for markup in markups:
if markup:
if ret:
ret.append(self.MARKUP_ELS_SEPARATOR)
ret.extend(markup)
return ret
@classmethod
def _get_text_markup_generic(cls, text, mentions):
if not text:
return None
if not (cfg.use_formatting or mentions):
return [text]
if mentions:
text = Message.text_w_mentions_generic(
text,
mentions,
bracket_char=cls.MENTION_BRACKET_CHAR,
)
ret = []
pos = 0
for match in cls.FORMATTING_RE.finditer(text):
if pos != match.start():
# Do not add empty strings. Urwid breaks on markup like:
# [.., ('bold', 'txt1'), '', ('bold', 'txt2'), ...]
ret.append(text[pos : match.start()])
ret.append((cls.FORMAT_MAP[match[1]], match.group()[1:-1]))
pos = match.end()
if pos != len(text):
ret.append(text[pos:])
return ret
def _get_text_markup(self):
return self._get_text_markup_generic(
get_envelope_msg(self.msg.envelope),
self.msg.mentions,
)
@classmethod
def _get_attachments_markup_generic(cls, attachments):
if not attachments:
return None
attach_list = [get_attachment_name(attach) for attach in attachments]
if len(attachments) > cls.MAX_ATTACHS_SHOW:
attach_list = attach_list[: cls.MAX_ATTACHS_SHOW]
attach_list.append(f'... ({len(attachments)-cls.MAX_ATTACHS_SHOW} more)')
attach_txt = ', '.join(attach_list)
return ['[attached: ', ('italic', attach_txt), ']']
def _get_attachments_markup(self):
return self._get_attachments_markup_generic(self.msg.attachments)
def _get_time_markup(self):
if not cfg.show_message_time:
return None
time_markup = strftimestamp(self.msg.timestamp, cfg.show_message_time)
return [('italic', time_markup)]
def _get_sender_markup(self):
envelope = self.msg.envelope
is_group = is_envelope_group_message(envelope)
if not (is_group or cfg.show_names):
return None
if is_envelope_outgoing(envelope):
if not (cfg.show_names or cfg.show_inline=='columns' and cfg.one_sided):
return None
sender_name = 'Me'
else:
sender_name = action_request.get_contact_name(self.msg.sender_num)
return [('bolditalic', sender_name)]
def _get_quote_markup(self):
quote = get_envelope_quote(self.msg.envelope)
if not quote:
return None
try:
quote_author_num = quote['author']
quote_text = textwrap.shorten(quote['text'], 70)
quote_attachments = quote['attachments']
except KeyError:
logging.error("Failed to extract a quote from %s", self.msg.envelope)
return None
if cfg.show_inline:
ret = ['|> ']
sep = ' '
elif self.align == 'left':
ret = ['| ']
sep = self.MARKUP_ELS_SEPARATOR + '| '
else:
ret = []
sep = ' |' + self.MARKUP_ELS_SEPARATOR
text_markup = self._get_text_markup_generic(
text=quote_text,
mentions=quote.get('mentions'),
) or []
for index, markup_element in enumerate(text_markup):
try:
text_markup[index] = markup_element.replace('\n', sep)
except AttributeError:
continue
ret.append((
'bolditalic',
action_request.get_contact_name(quote_author_num),
))
for m in (
text_markup,
self._get_attachments_markup_generic(quote_attachments),
):
if m:
ret.append(sep)
ret.extend((
('italic', el) for el in m
))
if self.align == 'right' and not cfg.show_inline == 'columns' or cfg.show_inline == 'wrap':
ret.append(' |')
return ret
def _get_call_message_markup(self):
call_message = self.msg.envelope['callMessage']
if 'offerMessage' in call_message:
return ['📞 ', ('italic', 'Incoming call')]
elif 'answerMessage' in call_message:
return [('italic', 'Calling'), ' 📞']
elif get_nested(call_message, 'hangupMessage', 'type') == 'NORMAL':
# For accepted calls, `type: "ACCEPTED"`
return ['📞 ', ('italic', 'Hung up')]
return None
def _get_status_markup(self):
return DeliveryStatus.MARKUP_MAP[self.msg.delivery_status]
def update_status(self):
status_markup_new = self._get_status_markup()
self._status_w.set_text(status_markup_new)
def reload_markup(self):
msg_markup = self._get_message_markup()
self._text_w.set_text(msg_markup or '')
def highlight(self):
self._w.set_attr_map(REVERSED_FOCUS_MAP)
def unhighlight(self):
self._w.set_attr_map({None: self._color})
def _add_pile_row(self, row_w):
o_w = self._w.original_widget
if isinstance(o_w, urwid.Pile):
o_w.contents.append((row_w, o_w.options()))
else:
self._w.original_widget = urwid.Pile([o_w, row_w])
def _remove_pile_row(self, row_w):
o_w = self._w.original_widget
if not isinstance(o_w, urwid.Pile):
return
try:
o_w.contents.remove((row_w, o_w.options()))
except ValueError:
return
if len(o_w.contents) == 1:
self._w.original_widget = o_w.contents[0][0]
def _insert_column(self, pos, w, col_opts=('pack',)):
if pos == 'last':
pos = len(self._columns_w.contents)
self._columns_w.contents.insert(pos, (w, self._columns_w.options(*col_opts)))
def _remove_column(self, w=None, pos=None):
cols = self._columns_w.contents
if w is not None:
for pos, (widget, _opts) in enumerate(cols): # pylint: disable=redefined-argument-from-local
if widget is w:
break
else:
raise ValueError(f"Column with widget {w} not found")
del cols[pos]
def update_reactions_w(self):
try:
reactions = self.msg.reactions
except AttributeError:
return
emojis_markup = []
for envelope in reactions.values():
reaction = get_envelope_reaction(envelope)
if not reaction.get('isRemove'):
emojis_markup.append(reaction['emoji'])
if not emojis_markup:
self._remove_reactions_w()
return
try:
self._reactions_w.update(emojis_markup)
except AttributeError:
self._add_reactions_w(emojis_markup)
def _add_reactions_w(self, emojis_markup):
self._reactions_w = MessageReactionsWidget(emojis_markup, self.align)
if not cfg.show_inline:
self._add_pile_row(self._reactions_w)
else:
pos = 'last' if self.align == 'left' else 0
self._insert_column(pos, self._reactions_w, ('weight', 0.5))
def _remove_reactions_w(self):
if self._reactions_w is None:
return
call_method = self._remove_column if cfg.show_inline else self._remove_pile_row
call_method(self._reactions_w)
self._reactions_w = None
class MessageWidgetsCache:
"""Create and cache widgets for LazyEvalMessageListWalker"""
def __init__(self):
self._cache = {}
#self._cache = weakref.WeakValueDictionary()
# Using a weak reference dictionary would save memory, but at the cost of using cpu to (re)create MessageWidget objects after switching back and forth between the chats.
def get(self, msg, _position=None):
key = self._hash(msg)
try:
# Not using
# return self._cache.setdefault(key, MessageWidget(msg))
# insted of this try..except, because it would (re)create a new MessageWidget(msg) obj every time, even if it's already in the cache.
w = self._cache[key]
except KeyError:
w = MessageWidget(msg)
self._cache[key] = w
return w
@staticmethod
def _hash(msg):
return hash((msg.sender_num, msg.timestamp))
def on_delivery_status_changed(self, timestamp, _status):
key = hash((cfg.username, timestamp))
try:
msg_w = self._cache[key]
except KeyError:
# This is not necessarily an error:
# Happens when the msg's delivery status is set before the message widget is created. For instance, when status = sending, or before the chat is opened and the widgets for it are created.
return
msg_w.update_status()
def adjust_timestamp(self, msg, timestamp_adj):
"""Save memory by purging entry with old timestamp from cache.
Also, saves cpu by not re-creating new MessageWidgets.
"""
# This method is not be needed if self._cache is a weakref dictionary.
key = self._hash(msg)
key_adj = hash((msg.sender_num, timestamp_adj))
try:
# Theoretically, it's possible to get a race condition here if signal-cli returns adjusted timestamp before the msg with un-adjusted timestamp is added to the _cache.
self._cache[key_adj] = self._cache.pop(key)
except KeyError:
pass
class LazyEvalMessageListWalker(LazyEvalListWalker):
def __init__(self, contents, init_focus_pos=-1):
self.msg_ws_cache = MessageWidgetsCache()
super().__init__(contents, self.msg_ws_cache.get, init_focus_pos)
class ChatView(ListBoxPlus):
signals = ['pick_reaction']
def __init__(self):
lw = LazyEvalMessageListWalker(urwid.MonitoredList())
super().__init__(lw)
def _update_search_results(self, txt, old_txt=''):
if not txt:
return
scope = self.contents if old_txt in txt else None
# Incremental search: only search through the current search results, rather then the whole chat.
def test_match(msg):
if not msg.text:
return None
return txt.casefold() in msg.text.casefold()
self.filter_contents(test_match, scope)
self.try_set_focus(-1)
def _reset_search(self, keep_curr_focused=False):
"""Restore the pre-search contents.
If keep_curr_focused is false, the focus is restored to the widget that was in focus before the search was started.
Otherwise, place the focus on the same message that was in focus before the search is removed.
"""
curr_focused_msg_w = self.focus
self.filter_contents(None)
if keep_curr_focused:
focus_position = self.contents.index(curr_focused_msg_w.msg)
self.try_set_focus(focus_position)
def on_input_line_change(self, input_line_w, old_text):
txt = input_line_w.get_edit_text()
if txt.startswith('/'):
self._update_search_results(txt[1:], old_text[1:])
elif self.is_filter_on:
self._reset_search()
def _delete_message(self, message_widget):
index = self.focus_position if not self.is_filter_on else None
action_request.delete_message_prompt(message_widget, index)
if self.is_filter_on:
del self.contents[self.focus_position]
def _resend_message(self, msg):
focus_position = self.focus_position # Saving it because it'll shift after resend_message().
index = focus_position if not self.is_filter_on else None
try:
action_request.resend_message(msg, index)
except TypeError:
return
if self.is_filter_on:
del self.contents[focus_position]
self.contents.append(self._contents_pre_filter[-1])
# The `_contents_pre_filter` for this class always points to the `current_chat` list. So after `resend()` action, its last element is the new message.
self.try_set_focus(-1)
def _focus_quoted_msg_w(self, envelope):
quote = get_envelope_quote(envelope)
if quote is None:
return
try:
quoted_msg_index = self.contents.index_ts(
quote['id'], # timestamp of orig message
quote['author'],
)
except (KeyError, ValueError):
return
self.try_set_focus(quoted_msg_index)
def keypress(self, size, key):
key = super().keypress(size, key)
message_widget = self.focus
if message_widget is None:
return key
envelope = message_widget.msg.envelope
if key in ('enter', 'right'):
if self.is_filter_on:
self._reset_search(keep_curr_focused=True)
elif get_envelope_msg(envelope) is not None:
ret = action_request.open_attach(envelope) or action_request.open_urls(envelope)
if not ret and get_envelope_quote(envelope):
self._focus_quoted_msg_w(envelope)
elif key == 'o':
_ = action_request.open_urls(envelope) or action_request.open_attach(envelope)
elif key == 'y':
txt = get_envelope_msg(envelope)
if not txt:
attachments = get_envelope_attachments(envelope)
if attachments is not None:
txt = ' '.join(get_attachment_path(attach) for attach in attachments)
action_request.copy_to_clipb(txt)
elif key == 'd':
self._delete_message(message_widget)
elif key == 'r':
self._resend_message(message_widget.msg)
elif key == 'D':
action_request.send_remote_delete_prompt(message_widget)
elif key == 'q':
# Replying / quoting not supported by signal-cli
# https://github.com/AsamK/signal-cli/issues/213
pass
elif key in ('R', 'e') and not message_widget.msg.not_repliable:
urwid.emit_signal(self, 'pick_reaction', size, self.calculate_visible(size, True))
else:
return key
return None
class ChatWindow(urwid.Frame):
def __init__(self):
self._title_widget = urwid.Text('', align='center')
self.input_line_w = InputLine()
self.chat_view = ChatView()
title_w_div = urwid.Pile([self._title_widget, urwid.Divider('-')])
input_w_div = urwid.Pile([urwid.Divider('-'), self.input_line_w])
self._focusable_widgets = {'chat': 'body', 'input': 'footer'}
super().__init__(self.chat_view, header=title_w_div, footer=input_w_div)
urwid.connect_signal(self.input_line_w, 'postchange', self.chat_view.on_input_line_change)
@property
def focus_widget_name(self):
for widget_name, focus_pos in self._focusable_widgets.items():
if focus_pos == self.focus_position:
return widget_name
return None
@focus_widget_name.setter
def focus_widget_name(self, widget_name):
self.focus_position = self._focusable_widgets[widget_name]
def set_title(self, contact):
name = contact.name_or_id
markup = [('bold', name)]
if not contact.is_group:
num = contact.number
if name != num:
markup.extend([' (', num, ')'])
else:
memb_names = [memb.name_or_id for memb in contact.member_contacts]
markup.append(' (')
markup.append(textwrap.shorten(', '.join(memb_names), 80))
markup.append(', ' if memb_names else 'only: ')
markup.extend([('italic', 'You'), ')'])
self._title_widget.set_text(markup)
def on_contact_selected(self, contact):
self.set_title(contact)
self.chat_view.try_set_focus(-1)
def keypress(self, size, key):
key = super().keypress(size, key)
if not self.input_line_w.edit_text.startswith('/'):
return key
if key == 'esc':
return self.input_line_w.keypress(size, key)
if key == 'enter' and self.focus_widget_name == 'input':
if not self.chat_view.is_filter_on:
# This clause is used for re-doing a search on a new chat contents after swtiching to a new contact.
urwid.emit_signal(self.input_line_w, 'postchange', self.input_line_w, '/')
if self.chat_view.contents:
self.focus_widget_name = 'chat'
return None
else:
return key
return None
# #############################################################################
# MainWindow
# #############################################################################
class StatusLine(urwid.WidgetWrap):
def __init__(self, unread_count=0):
self._text = urwid.Text('')
self._unreads_widget = urwid.Text([
"Unread messages count: ",
('bold', f"{unread_count}"),
])
self._status_cols = urwid.Columns([self._text, ('pack', self._unreads_widget)], dividechars=1)
self._prompt = None
self._prompt_response_callback = None
placeholder = urwid.WidgetPlaceholder(self._status_cols)
super().__init__(placeholder)
def set_text(self, new_text, append=False):
if append:
curr_markup = get_text_markup(self._text)
if curr_markup:
new_text = [curr_markup, '\n', new_text] # urwid.Text does not mind nested lists
self._text.set_text(new_text)
def set_unread_count(self, count):
txt = str(count) if count else str()
self._unreads_widget.set_text(('bold', txt))
def show_prompt(self, text, callback):
self._prompt_response_callback = callback
self._prompt = urwid.Edit(caption=text)
self._w.original_widget = self._prompt
def keypress(self, size, key):
# Keypresses are passed to this widget only when it has focus, which only happens when the prompt is on.
key = super().keypress(size, key)
if key == 'enter':
self._prompt_response_callback(self._prompt.edit_text)
elif key == 'esc':
self._prompt_response_callback(None)
else:
return key
self._w.original_widget = self._status_cols
return None
class MessageInfo(ListBoxPlus):
class OpenPath(FocusableText):
"""Open-able text: file or URL"""
def __init__(self, text, *args, fpath=None, **kwargs):
super().__init__(text, *args, **kwargs)
self.fpath = fpath
def get_path(self):
return self.fpath or self.text
def open_path(self):
if self.fpath:
return action_request.open_file(self.fpath)
return action_request.open_url(self.text)
def __init__(self, msg):
self._msg = msg
name_w = self._prop_val_w(
'Sender',
action_request.get_contact_name(msg.sender_num),
)
num_w = self._prop_val_w('Number', msg.sender_num)
date = strftimestamp(msg.timestamp)
date_w = self._prop_val_w('Date', date)
items = [name_w, num_w, date_w]
if msg.local_timestamp is not msg.timestamp:
received_timestamp = strftimestamp(msg.local_timestamp)
items.append(
self._prop_val_w('Received', received_timestamp)
)
if msg.text:
txt_w = self._prop_val_w('Message', msg.text)
items.append(txt_w)
delivery_status_w = self._get_delivery_status_w()
if delivery_status_w:
items.append(delivery_status_w)
items.append(urwid.Divider())
if msg.text:
urls = get_urls(msg.text)
if urls:
items.extend(self._get_urls_ws(urls))
if msg.attachments:
items.extend(self._get_attachments_ws(msg.attachments))
sticker = get_envelope_sticker(msg.envelope)
if sticker:
items.append(self._get_sticker_w(sticker))
reactions = getattr(msg, 'reactions', None)
if reactions is not None:
items.extend(self._get_reactions_ws(reactions))
if cfg.debug:
items.extend(self._get_debug_info())
super().__init__(items)
@staticmethod
def _prop_val_w(prop_name, prop_val):
padding_width = 8
prop_name_str = prop_name.ljust(padding_width) + ': '
return FocusableText([
('bold', prop_name_str),
prop_val
])
def _get_delivery_status_w(self):
status_detailed = self._msg.delivery_status_detailed
status_str = status_detailed.str
if not status_str:
return None
when_str = strftimestamp(status_detailed.when, strformat='%H:%M:%S %Y-%m-%d')
status_when = f' ({when_str})' if status_detailed.when else ''
return self._prop_val_w('Status', status_str + status_when)
def _get_urls_ws(self, urls):
header_w = urwid.Text([('bold', 'Links')], align='center')
ret = [header_w]
for url in urls:
url_w = self.OpenPath(url)
ret.append(url_w)
return ret
def _get_attachments_ws(self, attachments):
header_w = urwid.Text(('bold', 'Attachments'), align='center')
ret = [header_w]
for atch in attachments:
atch_w = self.OpenPath(
text=get_attachment_name(atch),
fpath=get_attachment_path(atch)
)
ret.append(atch_w)
return ret
def _get_sticker_w(self, sticker):
file_path = get_sticker_file_path(sticker)
sticker_w = self.OpenPath(
text=get_text_markup(self._prop_val_w('Sticker', file_path)),
fpath=file_path,
)
return sticker_w
@staticmethod
def _get_reactions_ws(reactions):
heading_w = urwid.Text([('bold', 'Reactions')], align='center')
ret = [heading_w]
for sender_num, envelope in reactions.items():
sender_name = action_request.get_contact_name(sender_num)
reaction = get_envelope_reaction(envelope)
if reaction.get('isRemove'):
continue
ret.append(FocusableText([
sender_name,
': ',
reaction['emoji'],
' (',
strftimestamp(get_envelope_time(envelope)),
')',
]))
if ret == [heading_w]:
return []
return ret
def _get_debug_info(self):
ret = [
urwid.Divider(),
urwid.Text(('bold', 'Debug info'), align='center'),
urwid.Text('Envelope', align='center'),
FocusableText(pprint.pformat(self._msg.envelope, width=-1)),
]
return ret
def keypress(self, size, key):
key = super().keypress(size, key)
item = self.body[self.focus_position]
if key == 'y':
try:
action_request.copy_to_clipb(item.get_path())
except AttributeError:
markup = get_text_markup(item)
if len(markup) == 2 and isinstance(markup[0], tuple):
# The line is `Property : Value` type
action_request.copy_to_clipb(markup[1])
else:
action_request.copy_to_clipb(item.text)
elif key in ('enter', 'o'):
try:
item.open_path()
except AttributeError:
pass
else:
return key
return None
class HelpDialog(ListBoxPlus):
def __init__(self):
items = [
urwid.Divider(),
urwid.Text(
"Please see README for the full list of commands and key bindings.",
align='center',
),
urwid.Divider(),
]
def open_readme(_button):
action_request.open_file(SCLI_README_FILE)
btn_open_readme = urwid.Button('Open README', on_press=open_readme)
pad = urwid.Padding(btn_open_readme, align='center', width=len(btn_open_readme.label)+4)
items.append(pad)
super().__init__(items)
class ReactionPicker(ViBindingsMixin, urwid.WidgetWrap):
signals = ['closed']
_emoji_regex = re.compile(
r'[^\w\s!"#$%&\'()*+,-./:;<=>?@[\]\\^_`{|}~]' # from `string.punctuation`
)
# Currently doing a simple test that the input text is a single non-word or punctuation char.
class EditEmoji(urwid.Edit):
signals = ['return']
def keypress(self, size, key):
if key in ('j', 'k', 'q'):
return key
elif key == 'enter':
urwid.emit_signal(self, 'return', self, self.edit_text)
return None
return super().keypress(size, key)
def __init__(self, msg):
self._msg = msg
grid_items = [
urwid.Button(
label=emoji,
on_press=self._reaction_picked,
user_data=emoji,
) for emoji in (
'💗',
'👍',
'👎',
'😂',
'😮',
'😥',
)
]
custom_emoji_input_w = self.EditEmoji(wrap='clip')
self._custom_emoji_text_w = urwid.Text('···', align='right')
grid_items.append(self._custom_emoji_text_w)
grid_items.append(custom_emoji_input_w)
grid_w = urwid.GridFlow(
grid_items,
cell_width=6,
h_sep=2,
v_sep=1,
align='center',
)
fill_w = urwid.Filler(grid_w)
urwid.connect_signal(
custom_emoji_input_w,
'return',
self._reaction_picked
)
super().__init__(fill_w)
def _is_single_emoji(self, text):
return self._emoji_regex.fullmatch(text)
# A check to reject sending non-emoji reactions.
# They are delivered successfully, but are not displayed by the official clients (see signal-cli#834).
# The current simple regex test is meant to notify a user of accidentally entered non-emoji text. It does not catch all possible strings that are not displayed on official clients. It also rejects some emoji that normally *would* be displayed, e.g. combined ones, like 👨‍👩‍👦‍👦 that report len!=1.
def _reaction_picked(self, _widget, emoji):
if self._is_single_emoji(emoji) or not emoji:
self._emit('closed')
action_request.send_reaction(self._msg, emoji)
else:
self._custom_emoji_text_w.set_text('❌⚠️👉')
class PopUpPlaceholder(urwid.WidgetPlaceholder):
def __init__(self, w):
super().__init__(w)
self._orig_w = w
# Urwid's terminology here might be confusing: "WidgetPlaceholder.original_widget" means "currently displayed widget", not the one it is originally initialized with.
def _show_pop_up(
self,
widget,
title='',
buttons=True,
shadow_len=2,
remove_callback=None,
**overlay_params
):
pop_up_box = PopUpBox(widget, title, buttons, shadow_len)
urwid.connect_signal(
pop_up_box,
'closed',
self._remove_pop_up,
user_args=[remove_callback]
)
overlay_args = {
'align': 'center',
'valign': 'middle',
'width': ('relative', 85),
'height': ('relative', 65),
}
overlay_args.update(overlay_params)
pop_up_overlay = urwid.Overlay(
pop_up_box,
self._orig_w,
**overlay_args,
)
self.original_widget = pop_up_overlay
def _remove_pop_up(self, remove_callback, *_sender_ws):
if remove_callback is not None:
remove_callback()
self.original_widget = self._orig_w
@property
def _is_popup_shown(self):
return self.original_widget is not self._orig_w
def show_help(self):
self._show_pop_up(HelpDialog(), title='Help')
def show_message_info(self, message_widget):
message_widget.highlight()
info = MessageInfo(message_widget.msg)
fill = urwid.Filler(info, height=('relative', 100), top=1, bottom=1)
self._show_pop_up(
fill,
title='Message info',
remove_callback=message_widget.unhighlight,
)
def show_reaction_picker(self, frame_top_bottom_method, size, visible):
input_w_rows = frame_top_bottom_method(size, focus=True)[0][1]
focus_offset = visible[0][0]
bottom_offset = input_w_rows + (size[1] - focus_offset) + 1
msg_widget = visible[0][1]
msg_widget.highlight()
self._show_pop_up(
ReactionPicker(msg_widget.msg),
buttons=False,
shadow_len=0,
remove_callback=msg_widget.unhighlight,
align=msg_widget.align,
left=2,
right=2,
valign='bottom',
bottom = bottom_offset,
width=30,
height=7,
)
def keypress(self, size, key):
key = super().keypress(size, key)
if not self._is_popup_shown:
# When popup is shown, do not pass keys to other widgets until it's closed
return key
return None
class MainWindow(urwid.WidgetWrap):
def __init__(self, contacts, chats_data):
self._chats_data = chats_data
self.contacts_w = ContactsWindow(contacts, self._chats_data)
self._paste_mode = False
self.chat_w = ChatWindow()
contacts_box = LineBoxHighlight(self.contacts_w)
self._chat_win_box = LineBoxHighlight(self.chat_w)
self._popup_ph = PopUpPlaceholder(self._chat_win_box)
cols = [('weight', 1, contacts_box), ('weight', 3, self._popup_ph)]
self._columns = urwid.Columns(cols)
self._contacts_column = self._columns.contents[0]
total_unread_count = self._chats_data.unread_counts.total
self.status_line = StatusLine(total_unread_count)
urwid.connect_signal(
self.chat_w.chat_view,
'pick_reaction',
self._popup_ph.show_reaction_picker,
user_args=[self.chat_w.frame_top_bottom]
)
w = urwid.Frame(self._columns, footer=self.status_line)
super().__init__(w)
@property
def contacts_hidden(self):
return self._contacts_column not in self._columns.contents
@contacts_hidden.setter
def contacts_hidden(self, yes_no):
if yes_no and not self.contacts_hidden:
self._columns.contents.remove(self._contacts_column)
elif not yes_no and self.contacts_hidden:
self._columns.contents.insert(0, self._contacts_column)
@property
def focus_widget_name(self):
if self.contacts_hidden or self._columns.focus_position == 1:
return self.chat_w.focus_widget_name
return 'contacts'
@focus_widget_name.setter
def focus_widget_name(self, widget_name):
if widget_name == 'contacts':
self.contacts_hidden = False
self._columns.focus_position = 0
else:
if cfg.contacts_autohide and not self.contacts_hidden:
self.contacts_hidden = True
self._columns.focus_position = 0 if self.contacts_hidden else 1
self.chat_w.focus_widget_name = widget_name
def _focus_next(self, reverse=False):
wnames = ['contacts', 'chat', 'input']
curr_wname = self.focus_widget_name
if not self.chat_w.chat_view.contents and not curr_wname == 'chat':
# If there are no messages in current chat (either because no chat selected, or searching has filtered out all results), don't focus it.
wnames.remove('chat')
curr_focus_pos = wnames.index(curr_wname)
incr = -1 if reverse else 1
next_wname = wnames[(curr_focus_pos + incr) % len(wnames)]
self.focus_widget_name = next_wname
def update_unread_count(self, contact_id):
self.contacts_w.contacts_list_w.update_contact_unread_count(contact_id)
self.status_line.set_unread_count(self._chats_data.unread_counts.total)
def on_contact_selected(self, contact, focus_widget):
self.status_line.set_text('')
# NOTE: for now not checking what's currently in the status line, just remove whatever text was there.
self.update_unread_count(contact.id)
self.chat_w.on_contact_selected(contact)
if focus_widget:
self.focus_widget_name = focus_widget
def prompt_on_status_line(self, text, callback, callback_finally=None):
def callback_wrapper(prompt_response):
if prompt_response is not None:
callback(prompt_response)
if callback_finally is not None:
callback_finally()
self._w.focus_position = 'body'
self.status_line.show_prompt(text, callback_wrapper)
self._w.focus_position = 'footer'
def prompt_on_status_line_yn(self, text, callback, default_response='y', callback_finally=None):
text += " [y/n]: ".replace(
default_response.lower(),
default_response.upper()
)
def callback_wrapper(prompt_response):
if not prompt_response:
prompt_response = default_response
if prompt_response.lower() in ('y', 'yes'):
callback()
self.prompt_on_status_line(text, callback_wrapper, callback_finally)
def show_help(self):
self._popup_ph.show_help()
self.focus_widget_name = 'chat'
def keypress(self, size, key):
if self._paste_mode:
if key == 'end paste':
self._paste_mode = False
return None
return key
if key == 'esc':
action_request.set_status_line('')
key = super().keypress(size, key)
if key == 'begin paste':
self._paste_mode = True
elif key == 'tab':
self._focus_next()
elif key == 'shift tab':
self._focus_next(reverse=True)
elif key == ':':
self.focus_widget_name = 'input'
self.keypress(size, key)
elif key == '/' and self.focus_widget_name == 'chat':
self.focus_widget_name = 'input'
self.keypress(size, key)
elif key in ('meta j', 'meta down'):
self.contacts_w.contacts_list_w.select_next_contact()
elif key in ('meta k', 'meta up'):
self.contacts_w.contacts_list_w.select_next_contact(reverse=True)
elif key == '?':
self.show_help()
elif key == 'i':
if self.focus_widget_name != 'chat':
return key
message_widget = self.chat_w.chat_view.focus
if message_widget is None:
return key
self._popup_ph.show_message_info(message_widget)
else:
return key
return None
class UrwidUI:
def __init__(self, contacts, chats_data):
self.main_w = MainWindow(contacts, chats_data)
# FYI: to later get the topmost widget, can also use `urwid_main_loop.widget`
self.loop = urwid.MainLoop(self.main_w, palette=PALETTE)
if cfg.color and cfg.color.high_color_mode:
self.loop.screen.set_terminal_properties(256)
MessageWidget.set_formatting_consants(cfg.use_formatting)
# Shortcuts for deeply nested attributes
self.contacts = self.main_w.contacts_w.contacts_list_w
self.chat = self.main_w.chat_w.chat_view
self.msg_ws_cache = self.chat.body.msg_ws_cache
# #############################################################################
# commands
# #############################################################################
class Commands:
def __init__(self, actions):
self._actions = actions
self.cmd_mapping = [
(['attach', 'a'], self._actions.attach),
(['edit', 'e'], self._actions.external_edit),
(['read', 'r'], self._actions.read),
(['attachClip', 'c'], self._actions.attach_clip),
(['openAttach', 'o'], self._actions.open_last_attach),
(['openUrl', 'u'], self._actions.open_last_url),
(['toggleNotifications', 'n'], self._actions.toggle_notifications),
(['toggleAutohide', 'h'], self._actions.toggle_autohide),
(['toggleContactsSort', 's'], self._actions.toggle_sort_contacts),
(['renameContact'], self._actions.rename_contact),
(['addContact'], self._actions.add_contact),
(['reload'], self._actions.reload),
(['help'], self._actions.show_help),
(['quit', 'q'], self._actions.quit),
]
self._map = {cmd.lower(): fn for cmds, fn in self.cmd_mapping for cmd in cmds}
def exec(self, cmd, *args):
fn = self._map.get(cmd.lower())
if fn is None:
self._actions.set_status_line(f"Command `{cmd}` not found")
return None
if not self._actions.check_cmd_for_current_contact(fn):
self._actions.set_status_line(f":{cmd} Error: no contact currently selected")
return None
try:
return fn(*args)
except TypeError as err:
# Handle only the exceptions produced by giving the wrong number of arguments to `fn()`, not any exceptions produced inside executing `fn()` (i.e. deeper in the stack trace)
if err.__traceback__.tb_next is not None:
raise
if re.search(r"missing \d+ required positional argument", str(err)):
self._actions.set_status_line(f':{cmd} missing arguments')
return None
elif re.search(r"takes \d+ positional arguments? but \d+ were given", str(err)):
self._actions.set_status_line(f':{cmd} extra arguments')
return None
else:
raise
class Actions:
def __init__(self, daemon, contacts, chats_data, urwid_ui):
self._daemon = daemon
self._contacts = contacts
self._chats_data = chats_data
self._urwid_ui = urwid_ui
self._clip = WLclip() if ClipBase.get_installed_clipb_manager() == 'wl-paste' else Xclip()
def reload(self, callback=None, **callback_kwargs):
if self._daemon.is_dbus_service_running:
self.update_contacts_async(callback, **callback_kwargs)
else:
self.set_status_line("reload error: signal-cli daemon is not running")
def _update_contacts_ui(self):
self._urwid_ui.contacts.update()
# Updating the title text in chat widget:
try:
current_contact = self._contacts.map[self._chats_data.current_contact.id]
# Need to re-obtain the contact object, since the one in _chats_data now points to an outdated object
except (AttributeError, KeyError):
return
self._urwid_ui.main_w.chat_w.set_title(current_contact)
def set_status_line(self, text, append=False):
self._urwid_ui.main_w.status_line.set_text(text, append)
def callf(self, *args, **kwargs):
"""Wrapper that logs and swallows the exceptions"""
try:
return callf(*args, **kwargs)
except (OSError, ValueError) as err:
logging.exception(err)
self.set_status_line(
'\n'.join([
str(err),
'Full error traceback written to log.',
])
)
return None
def send_desktop_notification(self, sender, message, avatar=None):
if not cfg.enable_notifications:
return
if avatar is None:
avatar = 'scli'
rmap = {}
for token, text in (('%s', sender), ('%m', message), ('%a', avatar)):
text = text.replace(r"'", r"'\''")
rmap[token] = text
rmap['_optionals'] = ('%s', '%m', '%a')
self.callf(cfg.notification_command, rmap, background=True)
if not cfg.notification_no_bell:
print('\a', end='')
def send_message_curr_contact(self, message="", attachments=None):
if self._chats_data.current_contact is None:
return
self._daemon.send_message(self._chats_data.current_contact.id, message, attachments)
def external_edit(self, *args):
if cfg.editor_command is None:
self.set_status_line(":edit Error: no command for external editor set")
return
filename = ''
if args:
filename, *message = split_path(*args)
if is_path(filename):
msg_file_path = os.path.expanduser(filename)
else:
with tempfile.NamedTemporaryFile(
suffix='.md', delete=False
) as temp_fo:
msg_file_path = tmpfile = temp_fo.name
message = args
if message:
with open(msg_file_path, "w", encoding="utf-8") as msg_file:
msg_file.write(*message)
self._daemon.main_loop.stop()
cmd = " ".join((cfg.editor_command, shlex.quote(msg_file_path)))
self.callf(cmd)
self._daemon.main_loop.start()
with open(msg_file_path, 'r', encoding="utf-8") as msg_file:
msg = msg_file.read().strip()
if msg:
self.send_message_curr_contact(msg)
try:
os.remove(tmpfile)
except NameError:
pass
def read(self, path_or_cmd):
message = ''
if is_path(path_or_cmd):
try:
with open(os.path.expanduser(path_or_cmd), 'r', encoding="utf-8") as file:
message = file.read()
except OSError as err:
logging.exception(err)
self.set_status_line(str(err))
elif path_or_cmd.startswith('!'):
proc = self.callf(
path_or_cmd[1:].strip(),
capture_output=True,
)
if proc is not None:
message = proc.stdout
else:
self.set_status_line(f"Error: could not read `{path_or_cmd}`")
if message != '':
self.send_message_curr_contact(message)
def attach(self, args):
attachment, *message = split_path(args)
attachment = os.path.expanduser(attachment)
if not os.path.isfile(attachment):
self.set_status_line('File does not exist: ' + attachment)
return
self.send_message_curr_contact(*message, attachments=[attachment])
def attach_clip(self, *message):
files = self._clip.files()
if files:
self.send_message_curr_contact(*message, attachments=files)
else:
self.set_status_line('Clipboard is empty.')
def copy_to_clipb(self, text):
self._clip.put(text)
def send_reaction(self, msg, emoji):
is_remove = not emoji
if is_remove:
try:
current_reaction_envelope = msg.reactions[self._contacts.sigdata.own_num]
emoji = get_envelope_reaction(current_reaction_envelope)['emoji']
except (AttributeError, KeyError):
emoji = '👍'
# Official signal clients do not remove emojis if emoji=="". It needs to be any (non-empty) emoji.
self._daemon.send_reaction(
msg.contact_id,
emoji,
msg.sender_num,
msg.timestamp,
is_remove,
)
def open_file(self, path):
if not os.path.exists(path):
logging.warning("File does not exist: %s", path)
self.set_status_line('File does not exist: ' + path)
return None
return self.callf(cfg.open_command, {'%u': path}, background=True)
def open_attach(self, envelope):
attachments = get_envelope_attachments(envelope)
if not attachments:
quote = get_envelope_quote(envelope)
if quote is not None and quote.get('attachments'):
try:
msg_orig = self._chats_data.chats.get_msg_for_timestamp(
envelope,
quote['id'], # timestamp of orig message
quote['author'],
)
except (KeyError, ValueError):
pass
else:
self.open_attach(msg_orig.envelope)
if attachments:
for attachment in attachments:
file_path = get_attachment_path(attachment)
if file_path:
self.open_file(file_path)
return attachments
# Treating stickers as attachments with a different dir path
sticker = get_envelope_sticker(envelope)
if sticker:
file_path = get_sticker_file_path(sticker)
if file_path:
self.open_file(file_path)
return sticker
return None
def open_last_attach(self):
for msg in reversed(self._chats_data.current_chat):
if self.open_attach(msg.envelope):
return
def open_url(self, url):
return self.callf(cfg.open_command, {'%u': url}, background=True)
def open_urls(self, envelope):
txt = get_envelope_msg(envelope)
urls = get_urls(txt) if txt else []
for url in urls:
self.open_url(url)
return urls
def open_last_url(self):
for txt in reversed(self._chats_data.current_chat):
if self.open_urls(txt.envelope):
return
# pylint: disable=attribute-defined-outside-init
# `Config` class uses __setattr__ that forwards to argparser's `args` instance.
@staticmethod
def toggle_autohide():
cfg.contacts_autohide = not cfg.contacts_autohide
def toggle_sort_contacts(self):
cfg.contacts_sort_alpha = not cfg.contacts_sort_alpha
self.reload()
def toggle_notifications(self):
cfg.enable_notifications = not cfg.enable_notifications
notif = ''.join((
'Desktop notifications are ',
'ON' if cfg.enable_notifications else 'OFF',
'.'
))
self.set_status_line(notif)
# pylint: enable=attribute-defined-outside-init
def add_contact(self, args):
"""Add a new contact.
The syntax is
:addContact +NUMBER [Contact Name]
"""
try:
number, name = args.split(maxsplit=1)
except ValueError:
number, name = args, ""
if not is_number(number):
self.set_status_line(f':addContact "{number}": not a valid number')
return
self._daemon.rename_contact(number, name, is_group=False, callback=lambda *i: self.reload())
def rename_contact(self, args):
"""Rename contact.
:renameContact +NUMBER new name here -> use +NUMBER number
:renameContact "Old Name" new name here -> use contact named "Old Name"
:renameContact new name here -> rename current contact or group
"""
try:
number, new_name = split_path(args)
if not is_number(number):
for contact_id, contact in self._contacts.map.items():
if contact.name == number:
is_group = contact.is_group
break
else: # contact with name `number` not found
raise ValueError
elif self._contacts.get_by_id(number) is None:
self.set_status_line(f":renameContact Error: no contact with number {number} found")
return
else:
is_group = False
contact_id = number
except ValueError:
if self._chats_data.current_contact is None:
self.set_status_line(":renameContact Error: no contact currently selected")
return
contact_id = self._chats_data.current_contact.id
is_group = self._chats_data.current_contact.is_group
new_name = args
self._daemon.rename_contact(contact_id, new_name, is_group, lambda *i: self.reload())
def _delete_message(self, msg, index=None):
self._chats_data.chats.delete_message(msg, index)
self._chats_data.delivery_status.delete(msg.timestamp)
def delete_message_prompt(self, message_widget, index=None):
message_widget.highlight()
msg = message_widget.msg
self._urwid_ui.main_w.prompt_on_status_line_yn(
"Delete message from local history?",
callback=lambda: self._delete_message(msg, index),
callback_finally=message_widget.unhighlight,
)
def resend_message(self, msg, index=None):
if msg.delivery_status != 'send_failed':
# Only allow re-sending previously failed-to-send messages
raise TypeError
self._delete_message(msg, index)
self.set_status_line('') # remove 'send-failed' status line
envelope = msg.envelope
contact_id = get_envelope_contact_id(envelope)
message = get_envelope_msg(envelope)
attachments = get_envelope_attachments(envelope)
self._daemon.send_message(contact_id, message, attachments)
def send_remote_delete_prompt(self, message_widget):
msg = message_widget.msg
if msg.sender_num != self._contacts.sigdata.own_num:
return
message_widget.highlight()
self._urwid_ui.main_w.prompt_on_status_line_yn(
"Remote delete message?",
callback=lambda: self._daemon.send_remote_delete(
msg.contact_id,
msg.timestamp,
),
callback_finally=message_widget.unhighlight,
)
def update_contacts_async(self, callback=None, **callback_kwargs):
def on_contacts_updated():
current_contact = self._chats_data.current_contact
if current_contact is not None and current_contact.id not in self._contacts.map:
self._chats_data.current_contact = None
self._update_contacts_ui()
self._chats_data.contacts_cache = self._contacts.serialize()
self._daemon.unpause_message_processing()
if callback is not None:
callback(**callback_kwargs)
self._daemon.pause_message_processing()
self._update_indiv_contacts_async(
self._update_groups_async,
on_contacts_updated,
)
def _update_indiv_contacts_async(self, callback, *cb_args, **cb_kwargs):
def on_indiv_contacts_updated(contacts_dict):
self._contacts.update(contacts_dict, clear=True)
callback(*cb_args, **cb_kwargs)
self._daemon.get_indiv_contacts(
callback=on_indiv_contacts_updated
)
def _update_groups_async(self, callback, *cb_args, **cb_kwargs):
def on_got_groups_dicts(groups_dict):
self._contacts.update(groups_dict)
self._contacts.set_groups_membership()
callback(*cb_args, **cb_kwargs)
def on_got_groups_ids(groups_ids):
self._daemon.populate_groups_dict(
groups_ids,
callback=on_got_groups_dicts,
)
self._daemon.get_groups_ids(
callback=on_got_groups_ids
)
def show_new_msg_notifications(self, msg):
sender_name = self.get_contact_name(msg.sender_num)
contact_avatar = self._get_contact_avatar(msg.contact_id)
try:
*_, reaction_envelope = msg.reactions.values() # the latest reaction envelope
except (AttributeError, ValueError):
reaction_envelope = None
else:
sender_name = self.get_contact_name(
get_envelope_sender_id(reaction_envelope)
)
contact_avatar = self._get_contact_avatar(
get_envelope_contact_id(reaction_envelope)
)
def get_msg_notif():
if reaction_envelope is not None:
reaction = get_envelope_reaction(reaction_envelope)
if reaction.get('isRemove'):
return None
reaction_emoji = reaction['emoji']
return (reaction_emoji,
''.join((
'New reaction from ', repr(sender_name), ': ',
reaction_emoji,
)))
msg_text = msg.text
if msg_text:
return (msg_text,
''.join((
# NOTE: this could be a list (urwid.Text markup type), except for the textwrap.shorten below
'New message from ', repr(sender_name), ': ', repr(msg_text)
))
)
if msg.attachments:
return('[attachments]',
''.join((
'New attachments message from: ', repr(sender_name)
))
)
incoming_call = get_nested(msg.envelope, 'callMessage', 'offerMessage')
if incoming_call:
txt = '📞 Incoming call'
return(txt,
' '.join((
txt, 'from', repr(sender_name)
))
)
if get_envelope_sticker(msg.envelope):
return('[sticker]',
''.join((
'New sticker from: ', repr(sender_name)
))
)
return None
try:
msg_text, notif = get_msg_notif()
except TypeError:
return
notif = textwrap.shorten(notif, 80)
if reaction_envelope is None or cfg.notify_on_reactions:
self.send_desktop_notification(sender_name, msg_text, contact_avatar)
if (self._chats_data.current_contact is None
or msg.contact_id != self._chats_data.current_contact.id):
self.set_status_line(notif)
def get_contact_name(self, contact_num):
contact = self._contacts.get_by_id(contact_num)
return contact.name_or_id if contact else contact_num
def _get_contact_avatar(self, contact_id):
contact = self._contacts.get_by_id(contact_id)
return contact.avatar if contact else None
def check_cmd_for_current_contact(self, fn):
return (
self._chats_data.current_contact is not None
or fn not in (
self.external_edit,
self.read,
self.attach,
self.attach_clip,
self.open_last_attach,
self.open_last_url,
)
)
def show_help(self):
self._urwid_ui.main_w.show_help()
@staticmethod
def quit():
raise urwid.ExitMainLoop()
class ActionRequest:
# The idea of having this class & its instance is to make a *globally accessible* function for all UI classes to call to request an action (e.g. setting status line text), without having to pass `Actions` instances down the class stack to every UI class that needs (or might need) it.
# There might be a better OO way of doing this though.
def __init__(self, actions=None):
self._actions = actions
def set_actions(self, actions):
self._actions = actions
def __getattr__(self, method):
return getattr(self._actions, method)
action_request = ActionRequest()
# #############################################################################
# Coordinate
# #############################################################################
class Coordinate:
def __init__(self):
self._chats_data = ChatsData(cfg.save_history)
sigdata = SignalData(cfg.username)
self._contacts = Contacts(sigdata, self._chats_data.contacts_cache)
self._ui = UrwidUI(self._contacts, self._chats_data)
self.daemon = Daemon(self._ui.loop, cfg.username)
self._actions = Actions(self.daemon, self._contacts, self._chats_data, self._ui)
self._commands = Commands(self._actions)
action_request.set_actions(self._actions)
self._connect_methods()
def _connect_methods(self):
for cb_name in self.daemon.callbacks:
self.daemon.callbacks[cb_name] = getattr(self, "_on_" + cb_name)
urwid.connect_signal(self._ui.contacts, 'contact_selected', self._on_contact_selected)
cfg.on_modified = self._on_cfg_changed
self._chats_data.delivery_status.on_status_changed = self._ui.msg_ws_cache.on_delivery_status_changed
Message.set_class_functions(
get_delivery_status=self._chats_data.delivery_status.get_detailed,
get_contact=self._contacts.get_by_id,
)
self._ui.main_w.chat_w.input_line_w.set_cmds(self._commands)
self._chats_data.typing_indicators.set_alarm_in = self._ui.loop.set_alarm_in
self._chats_data.typing_indicators.remove_alarm = self._ui.loop.remove_alarm
def _on_sending_message(self, envelope):
group_members = None
if is_envelope_group_message(envelope):
group_id = get_envelope_contact_id(envelope)
group = self._contacts.get_by_id(group_id)
if group is not None:
# Can happen if `group` is absent from the `groupStore` (for whatever reason), and we get a sync-ed message sent to `group` from another device. See #126.
group_members = group.members_ids
self._chats_data.delivery_status.on_sending_message(envelope, group_members)
msg = self._chats_data.chats.add_envelope(envelope)
self._ui.chat.try_set_focus(-1)
self._ui.contacts.on_new_message(msg)
def _on_sending_reaction(self, envelope):
self._chats_data.delivery_status.on_sending_message(envelope)
self._add_reaction(envelope)
self._ui.chat.try_set_focus(-1)
# Ensuring the last message does not get pushed down out of the view by the new reaction row.
# None of the following alternatives solve this:
# self._ui.chat.set_focus_valign('bottom'), focus_position=<orig_focus>, set_focus(<orig_focus>, coming_from='below')
def _on_sending_done(self, envelope, status='sent', timestamp_adj=None):
self._chats_data.delivery_status.on_sending_done(envelope, status, timestamp_adj)
self._ui.contacts.on_sending_done(envelope, status, timestamp_adj)
try:
chat, index = self._chats_data.chats.get_chat_index_for_envelope(envelope)
except ValueError:
return
msg = chat[index]
if status == 'send_failed':
msg_txt = textwrap.shorten(msg.text, 20)
self._actions.set_status_line(
f'Message "{msg_txt}" failed to send. '
'Press `r` on message to re-send.',
append=True,
)
self._ui.chat.try_set_focus(-1)
return
if timestamp_adj is not None:
self._ui.msg_ws_cache.adjust_timestamp(msg, timestamp_adj)
chat.adjust_timestamp(msg, timestamp_adj, index)
self._ui.chat.try_set_focus(-1)
self._chats_data.delivery_status.process_buffered_receipts(msg.timestamp)
def _on_sending_reaction_done(self, envelope, status='sent', timestamp_adj=None):
if status == 'sent':
status = 'ignore_receipts'
self._chats_data.delivery_status.on_sending_done(envelope, status, timestamp_adj)
self._ui.contacts.on_sending_done(envelope, status, timestamp_adj)
if status != 'send_failed':
return
reaction = get_envelope_reaction(envelope)
emoji = reaction['emoji']
self._actions.set_status_line(
f'Reaction "{emoji}" failed to send.',
append=True,
)
reaction['isRemove'] = True
self._add_reaction(envelope)
# NOTE: When attempting to _replace_ an emoji with anoter one 'send_fail's, the old one will be removed (not shown) locally in scli, while continuing to be visible for the original recepients.
def _process_msg_envelope(self, envelope):
sender_num = get_envelope_sender_id(envelope)
self._chats_data.typing_indicators.remove(sender_num)
msg = self._chats_data.chats.add_envelope(envelope)
self._on_new_message(msg)
def _on_receive_message(self, envelope):
logging.info('Message envelope: %s', pprint.pformat(envelope))
contact_id = get_envelope_contact_id(envelope)
if contact_id in self._contacts.map:
self._process_msg_envelope(envelope)
else:
def after_contacts_reload(new_contact):
if new_contact is not None:
self._process_msg_envelope(envelope)
self._on_unknown_contact(envelope, callback=after_contacts_reload)
def _on_unknown_contact(self, envelope, callback):
logging.info("Message from unknown contact: %s", envelope)
def after_contacts_loaded():
contact_id = get_envelope_contact_id(envelope)
contact = self._contacts.map.get(contact_id)
if contact is not None:
return callback(contact)
logging.error("Message from unknown contact: %s", envelope)
sender_info = (
envelope.get("sourceName")
or
f"UUID: {contact_id}"
)
msg_text = get_envelope_msg(envelope)
self._actions.set_status_line([
'Message from an unknown chat: ',
repr(sender_info),
'\n',
repr(msg_text),
'\n',
'Accept this contact on the primary device first.',
])
self._actions.send_desktop_notification(sender_info, msg_text)
contact_temp = Contact({
"number": contact_id,
"name": (
envelope.get("sourceName") or
envelope.get("sourceNumber") or
envelope.get("source")[:10] + "..."
) + " [UNKNOWN CONTACT]"
})
self._contacts.map[contact_id] = contact_temp
self._contacts.indivs.add(contact_temp)
try:
envelope["dataMessage"]["message"] += "\n~~~\n[Message from an unknown contact. Accept on the primary device first]\n~~~"
except (KeyError, TypeError):
pass
self._chats_data.chats.add_envelope(envelope)
self._chats_data.unread_counts[contact_id] += 1
self._ui.contacts.update()
return callback(contact)
self._actions.reload(callback=after_contacts_loaded)
def _on_receive_sync_message(self, envelope):
self._on_sending_message(envelope)
self._on_sending_done(envelope)
def _on_new_message(self, msg, increment_unread_count=True):
self._ui.contacts.on_new_message(msg)
contact_id = msg.contact_id
if (self._chats_data.current_contact is not None
and contact_id == self._chats_data.current_contact.id):
self._ui.chat.try_set_focus(-1)
elif increment_unread_count:
self._chats_data.unread_counts[contact_id] += 1
self._ui.main_w.update_unread_count(contact_id)
self._actions.show_new_msg_notifications(msg)
def _add_reaction(self, envelope):
msg = self._chats_data.chats.add_reaction_envelope(envelope)
if not msg:
return None
msg_w = self._ui.msg_ws_cache.get(msg)
msg_w.update_reactions_w()
return msg
def _on_receive_receipt(self, envelope):
self._chats_data.delivery_status.on_receive_receipt(envelope)
def _on_receive_reaction(self, envelope):
msg = self._add_reaction(envelope)
if is_envelope_outgoing(envelope):
# Do not show notificitions for sync messages from linked devices
return
if msg is not None:
self._on_new_message(msg, increment_unread_count=cfg.notify_on_reactions)
# Not focusing on the received reaction message (same behavior as signal-desktop)
else:
# Show a notification for reaction to an "unknown" message (not in Chats)
reaction = get_envelope_reaction(envelope)
msg = Message({'source': reaction['targetAuthor']})
msg.add_reaction(envelope)
self._actions.show_new_msg_notifications(msg)
def _on_daemon_log(self, log_line):
if log_line.startswith("ERROR") and not self.daemon.is_dbus_service_running:
self._actions.set_status_line([
"signal-cli daemon has stopped:\n ",
log_line,
"\nRestart scli to restart the daemon."
])
elif "in use by another instance" in log_line:
self._actions.set_status_line([
"signal-cli: Config file is in use by another instance, waiting…\n",
"Stop previously launched signal-cli processes to continue.",
])
def _on_daemon_started(self):
logging.info("signal-cli dbus service started")
self._actions.set_status_line("Initializing signal-cli daemon... Done")
def clear_status_line(*_args):
self._actions.set_status_line("")
self._ui.loop.set_alarm_in(2, clear_status_line)
self._actions.update_contacts_async()
self.daemon.get_signal_cli_version(callback=logging.info)
def _on_contact_selected(self, contact, focus_widget):
self._chats_data.current_contact = contact
self._ui.chat.contents = self._chats_data.current_chat
self._chats_data.unread_counts[contact.id] = 0
self._ui.main_w.on_contact_selected(contact, focus_widget)
def _on_cfg_changed(self, key, val):
if key == 'contacts_autohide':
self._ui.main_w.contacts_hidden = val
def _on_contact_typing(self, envelope):
self._chats_data.typing_indicators.on_typing_message(envelope)
contact_id = get_envelope_contact_id(envelope)
if (self._chats_data.current_contact is not None
and contact_id == self._chats_data.current_contact.id):
self._ui.chat.try_set_focus(-1)
def _on_call_message(self, envelope):
call_message = envelope['callMessage']
if (
'offerMessage' in call_message
or 'answerMessage' in call_message
or get_nested(call_message, 'hangupMessage', 'type') == 'NORMAL'
):
msg = self._chats_data.chats.add_envelope(envelope)
if 'offerMessage' in call_message:
# Incoming call
self._on_new_message(msg)
def _on_contacts_sync(self):
logging.info("Received contacts sync message, reloading signal-cli contacts")
self._actions.reload()
def _on_remote_delete(self, envelope):
msg = self._chats_data.chats.add_remote_delete_envelope(envelope)
if not msg:
return
msg_w = self._ui.msg_ws_cache.get(msg)
msg_w.reload_markup()
def _on_sending_remote_delete_done(self, envelope, status='sent', _timestamp_adj=None):
# Not tracking delivery receipts for remote delete messages. Letting them be buffered by DeliveryStatus and then discarded on exit.
if status != 'send_failed':
return
self._actions.set_status_line(
'Sending remote delete message failed.',
append=True,
)
msg = self._chats_data.chats.add_remote_delete_envelope(envelope)
if not msg:
return
delattr(msg, 'remote_delete')
msg_w = self._ui.msg_ws_cache.get(msg)
msg_w.reload_markup()
def _on_untrusted_identity_err(self, envelope):
contact_id = envelope.get('target')
if contact_id is not None:
# Message sent to an untrusted identity
notification = [
"Contact's safety number has changed: ",
contact_id,
" (they might have reinstalled signal). ",
"Run `signal-cli trust …` to resolve.",
]
self._actions.set_status_line(notification)
return
# Message received from an untrusted identity
contact_id = envelope['source']
message = f"Message not decrypted: safety number with {contact_id} has changed"
envelope["dataMessage"] = {
"message": '[' + message + ']'
}
envelope["_received_timestamp"] = get_current_timestamp_ms()
envelope["_artificialEnvelope"] = "untrustedIdentity"
msg = self._chats_data.chats.add_envelope(envelope)
self._on_new_message(msg)
notification = message + " (they might have reinstalled signal).\nRun `signal-cli trust …` to resolve."
self._actions.set_status_line(notification)
def _on_user_unregistered_err(self, envelope):
self._actions.set_status_line(
f"Contact {envelope.get('target')} has unregistered. Can not send messages until they re-register."
)
def _on_receive_sticker(self, envelope):
msg = self._chats_data.chats.add_envelope(envelope)
self._on_new_message(msg)
# #############################################################################
# config
# #############################################################################
class Config:
def __init__(self, cfg_obj):
self._cfg_obj = cfg_obj
self.on_modified = noop
def set(self, cfg_obj):
self._cfg_obj = cfg_obj
def __getattr__(self, name):
return getattr(self._cfg_obj, name)
def __setattr__(self, name, value):
if name != '_cfg_obj' and hasattr(self._cfg_obj, name):
setattr(self._cfg_obj, name, value)
self.on_modified(name, value)
else:
super().__setattr__(name, value)
cfg = Config(None)
# #############################################################################
# argparse
# #############################################################################
class CustomDefaultsHelpFormatter(argparse.ArgumentDefaultsHelpFormatter):
"""Show default values in `--help` output for custom-set default values.
Modified `argparse.ArgumentDefaultsHelpFormatter` class that adds
`(default: %(default)s)`
to `--help` output, but only for the explicitly-set `default`s: not `True` for `action=store_true` arguments, and not `None` for `action=store` arguments (`action=store` is the default action for `argparse.add_argument()`, and `None` its default value).
"""
def _get_help_string(self, action):
if action.default in (None, False):
return action.help
return super()._get_help_string(action)
def make_arg_parser():
parser = argparse.ArgumentParser(
formatter_class=CustomDefaultsHelpFormatter,
)
subparser = parser.add_subparsers(
description='Use `%(prog)s <subcommand> -h` for additional help.',
dest='subcommand',
)
parser_link = subparser.add_parser(
'link',
help='Link to an existing device.',
formatter_class=CustomDefaultsHelpFormatter,
)
parser_link.add_argument(
'-n',
'--name',
default='scli',
help='Device name that will be shown in "Linked devices" list on primary device.',
)
parser.add_argument(
'-c',
'--config-file',
default=SCLI_CFG_FILE,
help='Path to the config file. Arguments on the command line override settings in the file.',
)
parser.add_argument(
'-u',
'--username',
help='Phone number starting with "+" followed by country code. If not given, %(prog)s will look for an existing profile in signal-cli\'s data dir.',
)
parser.add_argument(
'-N',
'--notification-command',
default="notify-send -i '%a' scli '%s - %m'",
help="Command to run when a new message arrives. %%m is replaced with the message, %%s is replaced with the sender, %%a is replaced with the path to the contact's avatar file if it exists, or with \"scli\" otherwise.",
)
parser.add_argument(
'-o',
'--open-command',
default='xdg-open "%u"',
help='File/URL opener command. %%u is replaced with the path.',
)
parser.add_argument(
'-e',
'--editor-command',
help='External text editor command. If not set, %(prog)s checks among `$VISUAL`, `$EDITOR`, `sensible-editor` etc.',
)
parser.add_argument(
'-G',
'--clipboard-get-command',
help='Command used by `:attachClip` to get a list of files to send as attachments. Should return one absolute file path per line. If not set, `xclip` or `wl-clipboard` is used.',
)
parser.add_argument(
'-P',
'--clipboard-put-command',
help='Command to put text on clipboard. %%s will be replaced with the text. If not set, `xclip` or `wl-clipboard` is used.',
)
parser.add_argument(
'-s',
'--save-history',
nargs='?',
const=SCLI_HISTORY_FILE,
default=False,
metavar='HISTORY_FILE',
help='Enable conversations history. History is saved in plain text. (default %(metavar)s: %(const)s)',
)
parser.add_argument(
'--log-file',
default=SCLI_LOG_FILE,
help='Path to the log file. If not explicitly specified, logs are written only if `--debug` or `--save-history` are on.',
)
parser.add_argument(
'-n',
'--enable-notifications',
action='store_true',
help='Enable desktop notifications. (See also --notification-command)',
)
parser.add_argument(
'--notify-on-reactions',
action='store_true',
help="Show notifications on receiving reaction messages.",
)
parser.add_argument(
'--notification-no-bell',
action='store_true',
help="Do not send a \"bell\" code to the terminal on notification. It sets the terminal window's urgency hint, making it more noticable. (The exact visual effect depends on the terminal emulator and the window manager)",
)
parser.add_argument(
'-f',
'--use-formatting',
action='store_true',
help='Show _italic_, *bold*, ~strikethrough~ formatting in messages.',
)
parser.add_argument(
'--color',
nargs='?',
const=True,
default=False,
help="Colorize messages. See README for options.",
)
parser.add_argument(
'-w',
'--wrap-at',
default='85%',
help="Wrap messages' text at a given number of columns / percentage of available screen width.",
)
parser.add_argument(
'--one-sided',
action='store_true',
help='Left-align both sent and received messages',
)
parser.add_argument(
'--show-names',
action='store_true',
help="Show contacts' names next to messages, even in one-to-one conversations.",
)
parser.add_argument(
'--show-message-time',
nargs='?',
const='%H:%M',
default='',
metavar='FORMAT',
help="Show messages' timestamps in the specified strftime %(metavar)s. (default: %(const)r)",
)
parser.add_argument(
'--show-inline',
nargs='?',
default=False,
choices=('columns', 'wrap'),
const='columns',
help="Print message's elements (sender's name, message text, attachment list, etc) on a single line.",
)
parser.add_argument(
'--show-message-padding',
nargs='?',
const=' ',
default=None,
metavar='PAD',
help="Insert a line of %(metavar)r characters between consecutive messages (default %(metavar)s: %(const)r)",
)
parser.add_argument(
'--group-contacts',
action='store_true',
help=argparse.SUPPRESS,
# The option name can be confusing, e.g. in:
# https://github.com/isamert/scli/issues/95#issuecomment-757502271
# Keep for backwards compatiability, but don't show in `--help`. Use `--partition-contacts` instead.
)
parser.add_argument(
'--partition-contacts',
action='store_true',
help='Separate groups and individual contacts in the contacts list.',
)
parser.add_argument(
'--contacts-autohide',
action='store_true',
help='Autohide the contacts pane when it loses focus.',
)
parser.add_argument(
'--contacts-sort-alpha',
action='store_true',
help='Sort contacts alphabetically. (default: sort by the most recent message)',
)
parser.add_argument(
'--daemon-command',
default=('signal-cli '
'-u %u '
'--output=json '
#'--trust-new-identities=always ' # requires s-cli v0.9.0+; does not notify of safety number change (see signal-cli#826)
'daemon'),
help='Command for starting signal-cli daemon. The `%%u` in command will be replaced with username (phone number).',
)
parser.add_argument(
'--no-daemon',
action='store_true',
help='Do not start signal-cli daemon. Only useful for debugging %(prog)s.',
)
parser.add_argument(
'--debug',
action='store_true',
help='Verbose log output.',
)
parser.add_argument(
'--version',
action='version',
version='%(prog)s ' + __version__,
)
return parser
def get_cfg_file_args(file_obj):
# Alternatively, can override `ArgumentParser.convert_arg_line_to_args()`.
# Could use `configparser` module if the syntax gets more complicated.
ret = {}
for line in file_obj:
line = line.strip()
if not line or line.startswith('#'):
continue
name, _, val = line.partition("=")
ret[name.strip()] = val.strip()
return ret
def get_opt_val_flags(parser):
"""Flags that optionally take values.
These are defined by
..., nargs='?', const=True, default=False, ...
See
https://docs.python.org/3/library/argparse.html#nargs
They allow any of the following forms on the command line:
--color
--color=high
<nothing> (i.e. option omitted)
In config file this corresponds to:
color = true
color = high
color = false
OR
<nothing> (option not mentioned in config)
"""
# For these arguments, checking for the `False` values in the config file (e.g. 'false', 'f', 'no', etc) needs to be done explicitly, unlike for the regular flags that interpret any config value that is not in ('true', 't', 'yes', etc) as `False`.
return frozenset(
opt
for a in parser._actions # pylint: disable=protected-access
for opt in a.option_strings
if (
a.nargs == argparse.OPTIONAL
and isinstance(a.const, bool)
and isinstance(a.default, bool)
)
)
def parse_cfg_file(parser, cli_args):
cfg_file_path = os.path.expanduser(cli_args.config_file)
try:
with open(cfg_file_path, encoding="utf-8") as cfg_f:
cfg_f_args_dict = get_cfg_file_args(cfg_f)
except FileNotFoundError:
if cli_args.config_file == parser.get_default('config_file'):
return cli_args
sys.exit("ERROR: Config file not found: " + cfg_file_path)
opt_val_flags = get_opt_val_flags(parser)
args_list = []
for arg_name, arg_val in cfg_f_args_dict.items():
arg_dest = arg_name.replace('-', '_') # Assuming `dest` has not been overriden.
if arg_dest not in cli_args:
print("WARNING: encountered an unrecognized argument while parsing config file:", arg_name, file=sys.stderr)
continue
arg_default = parser.get_default(arg_dest)
arg_name = '--' + arg_name
if isinstance(arg_default, bool):
if arg_val.lower() in ('true', 't', 'yes', 'y'):
args_list.append(arg_name)
elif (
arg_name in opt_val_flags
and arg_val.lower() not in ('false', 'f', 'no', 'n')
):
args_list.extend((arg_name, arg_val))
else:
args_list.extend((arg_name, arg_val))
# Need to actually parse the arguments (rather then simply updating args.__dict__), so that the `type`s would be set correctly.
cfg_file_args = parser.parse_args(args_list)
parser.parse_args(namespace=cfg_file_args)
return cfg_file_args
def parse_wrap_at_arg(width):
def bad_val(width):
sys.exit(
f"ERROR: Could not parse the width value: `{width}`.\n"
"The value should be an `<int>` or a `<float>%` (`42` or `42.42%`).\n"
"See `--help` for additional info."
)
if width.endswith('%'):
try:
percent_width = float(width.rstrip('%'))
except ValueError:
bad_val(width)
return ('relative', percent_width)
else:
try:
return int(width)
except ValueError:
bad_val(width)
def parse_args():
parser = make_arg_parser()
args = parser.parse_args()
if args.subcommand == 'link':
link_device(args.name)
sys.exit()
if args.config_file:
args = parse_cfg_file(parser, args)
if args.editor_command is None:
args.editor_command = get_default_editor()
if not args.username:
args.username = detect_username()
if args.color:
args.color = Color(args.color)
if args.save_history:
args.save_history = os.path.expanduser(args.save_history)
args.partition_contacts = args.partition_contacts or args.group_contacts
del args.__dict__['group_contacts']
args.wrap_at = parse_wrap_at_arg(args.wrap_at)
return args
# #############################################################################
# main
# #############################################################################
class BracketedPasteMode:
"""Context manager for enabling/disabling bracketed paste mode."""
# Same as tdryer's code
# https://github.com/urwid/urwid/issues/119#issuecomment-761424363
def __enter__(self):
sys.stdout.write('\x1b[?2004h')
def __exit__(self, exc_type, exc_value, traceback):
sys.stdout.write('\x1b[?2004l')
def link_device(device_name):
try:
pyqrcode = importlib.import_module('pyqrcode')
except ImportError:
sys.exit(
"ERROR: `pyqrcode` module not found. "
"Please install it with `pip install pyqrcode`"
)
print("Retrieving QR code, please wait...")
cmd_link = ['signal-cli', 'link', '-n', device_name]
with subprocess.Popen(
cmd_link, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
) as proc_link:
line = proc_link.stdout.readline().strip()
if line.startswith(('tsdevice:/', 'sgnl://linkdevice')):
qr_obj = pyqrcode.create(line, version=10)
print(qr_obj.terminal(module_color='black', background='white'))
else:
sys.exit(
"ERROR: Encountered a problem while linking:\n"
f"{line}\n"
f"{proc_link.stderr.read()}"
)
print(
"Scan the QR code with Signal app on your phone and wait for the linking process to finish.\n"
"You might need to zoom out for the QR code to display properly.\n"
"This may take a moment..."
)
proc_link.wait()
if proc_link.returncode != 0:
sys.exit(
"ERROR: Encountered a problem while linking:\n"
f"{proc_link.stderr.read()}"
)
print('Receiving data for the first time...')
cmd_receive = 'signal-cli -u {} receive'.format(detect_username())
with subprocess.Popen(
cmd_receive.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
) as proc_receive:
for receive_out in iter(proc_receive.stdout.readline, ''):
print(receive_out, end='')
proc_receive.wait()
if proc_receive.returncode != 0:
sys.exit(
"ERROR: Encountered a problem while receiving:\n"
f"{proc_receive.stderr.read()}"
)
print('Done.')
sys.exit(0)
def detect_username():
ulist = [acc["number"] for acc in SignalData.parse_accounts_json()]
if not ulist:
for folder in [SIGNALCLI_DATA_FOLDER, SIGNALCLI_LEGACY_DATA_FOLDER]:
try:
users = [x for x in os.listdir(folder)
if os.path.isfile(os.path.join(folder, x))
and x.startswith('+')]
ulist.extend(users)
except FileNotFoundError:
pass
if not ulist:
sys.exit("ERROR: Could not find any registered accounts. "
"Register a new one or link with an existing device (see README).")
elif len(ulist) == 1:
return ulist[0]
else:
sys.exit("ERROR: Multiple accounts found. Run one of:\n\t"
+ "\n\t".join((f"scli --username={u}" for u in ulist)))
def main():
try:
os.makedirs(SCLI_ATTACHMENT_FOLDER)
except OSError as exc:
if not (exc.errno == errno.EEXIST and os.path.isdir(SCLI_DATA_FOLDER)):
sys.exit("ERROR: Could not create a directory in " + SCLI_DATA_FOLDER)
args = parse_args()
if args.debug:
logging.basicConfig(filename=args.log_file, level=logging.DEBUG)
elif args.save_history or args.log_file != SCLI_LOG_FILE:
logging.basicConfig(filename=args.log_file, level=logging.WARNING)
else:
logging.disable()
logging.info("scli %s", __version__)
cfg.set(args)
coord = Coordinate()
loop = coord.daemon.main_loop
if not args.no_daemon:
proc = coord.daemon.start()
atexit.register(proc.kill)
action_request.set_status_line("Initializing signal-cli daemon... ")
for sig in (signal_ipc.SIGHUP, signal_ipc.SIGTERM):
signal_ipc.signal(sig, lambda signum, frame: action_request.quit())
with BracketedPasteMode():
loop.run()
__version__ = get_version()
if __name__ == "__main__":
main()