Files
slidgevoipms/session.py
2024-04-11 13:12:54 -07:00

145 lines
4.9 KiB
Python

"""
User actions
"""
import aiohttp
import asyncio
from datetime import datetime, timedelta
import logging
from pathlib import Path
from pytz import timezone
from typing import TYPE_CHECKING, Optional, Union
from slidge import BaseSession, GatewayUser, LegacyContact, LegacyRoster
log = logging.getLogger(__name__)
class Contact(LegacyContact[str]):
session: "Session"
class Roster(LegacyRoster[str, "Contact"]):
session: "Session"
async def jid_username_to_legacy_id(self, jid_username: str) -> str:
if len(jid_username) != 10: # TODO more in depth validation
raise XMPPError("bad-request", "This is not a valid 10 digit phone number")
return jid_username
API_URL = 'https://voip.ms/api/v1/rest.php'
EASTERN_TIME = timezone('America/New_York')
ASSETS_DIR = Path(__file__).parent / "assets"
#@dataclass
#class VoipMsSms:
# id: str
# date: str
# type: VoipMsSmsType
# did: str
# contact: str
# message: str
# col_media1: str
class Session(BaseSession[str, Contact]):
def __init__(self, user: GatewayUser):
self.httpsession = aiohttp.ClientSession()
self.stop_loop = False
super().__init__(user)
async def logout(self):
self.stop_loop = True
await asyncio.sleep(10) # ensure the loop is dead
self.httpsession.close()
async def login(self):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'getDIDsInfo',
'did': f['did'],
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] == 'success':
self.xmpp.loop.create_task(self.poll_loop())
return f"Connected as {json['dids'][0]['did']}"
else:
return f"Failure! {json['status']}"
async def poll_loop(self):
last_run_ids = []
last_run_time = datetime.now() - timedelta(seconds=10)
log.debug('poll loop initiated')
while not self.stop_loop:
current_run_time = datetime.now()
messages = await self.get_messages(last_run_time - timedelta(seconds=2))
new_messages = [message for message in messages if message['id'] not in last_run_ids]
last_run_ids = [message['id'] for message in messages]
for message in new_messages:
contact = await self.contacts.by_legacy_id(message['contact'])
if message['col_media1'] != '':
contact.send_file(file_url=message['col1_media'], legacy_message_id=message['id'])
elif message['message'] != '':
contact.send_text(message['message'], legacy_msg_id=message['id'])
last_run_time = current_run_time
await asyncio.sleep(10)
# See this issue for timezone explanation https://github.com/michaelkourlas/voipms-sms-client/issues/35
async def get_messages(self, from_time: datetime):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'getMMS',
'did': f['did'],
'from': from_time.astimezone(EASTERN_TIME).strftime('%Y-%m-%d %H:%M:%S'),
'timezone': -5,
'type': 1,
'all_messages': 1,
'content_type': 'json',
}) as response:
json = await response.json()
log.debug(f"received messages from voipms {json}")
if json['status'] != 'success':
return []
else:
return json['sms']
async def on_file(self, chat: Contact, url: str, **_kwargs):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'sendMMS',
'did': f['did'],
'dst': chat.legacy_id,
'media1': url,
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] != 'success':
raise XMPPError("Unable to send")
async def on_text(
self,
chat: Contact,
text: str,
**_kwargs
):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'sendSMS', #TODO support MMS
'did': f['did'],
'dst': chat.legacy_id,
'message': text,
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] != 'success':
raise XMPPError("Unable to send")