Files
slidgevoipms/session.py

128 lines
3.9 KiB
Python

"""
User actions
"""
import aiohttp
import asyncio
from datetime import datetime
from pathlib import Path
from pytz import timezone
from typing import TYPE_CHECKING, Optional, Union
from slidge import BaseSession, GatewayUser, LegacyContact, LegacyRoster
class Contact(LegacyContact[str]):
session: "Session"
class Roster(LegacyRoster[str, "Contact"]):
session: "Session"
async def jid_username_to_legacy_id(self, jid_username: str) -> str:
if len(jid_username) != 10: # TODO more in depth validation
raise XMPPError("bad-request", "This is not a valid 10 digit phone number")
return jid_username
API_URL = 'https://voip.ms/api/v1/rest.php'
EASTERN_TIME = timezone('America/New_York')
ASSETS_DIR = Path(__file__).parent / "assets"
#@dataclass
#class VoipMsSms:
# id: str
# date: str
# type: VoipMsSmsType
# did: str
# contact: str
# message: str
# col_media1: str
async def api_request(session, params):
async with session.get(API_URL, params=parFams) as response:
return await response.json()
class Session(BaseSession[str, Contact]):
def __init__(self, user: GatewayUser):
self.httpsession = aiohttp.ClientSession()
super().__init__(user)
def shutdown(self):
super().shutdown()
self.httpsession.close()
async def login(self):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'getDIDsInfo',
'did': f['did'],
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] == 'success':
return f"Connected as {json['dids'][0]['did']}"
else:
return f"Failure! {json['status']}"
async def poll_loop():
f = self.user.registration_form
while True:
pass
# See this issue for timezone explanation https://github.com/michaelkourlas/voipms-sms-client/issues/35
async def get_messages(self, from_time: datetime):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'getMMS',
'did': f['did'],
'from': from_time.astimezone(EASTERN_TIME).strftime('%Y-%m-%d %H:%M:%S'),
'timezone': -5,
'type': 1,
'all_messages': 1,
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] != 'success':
return []
else:
return json['sms']
async def on_file(self, chat: Contact, url: str, **_kwargs):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'sendMMS',
'did': f['did'],
'dst': chat.legacy_id,
'media1': url,
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] != 'success':
raise XMPPError("Unable to send")
async def on_text(
self,
chat: Contact,
text: str,
**_kwargs
):
f = self.user.registration_form
async with self.httpsession.get(API_URL, params={
'api_username': f['username'],
'api_password': f['password'],
'method': 'sendSMS', #TODO support MMS
'did': f['did'],
'dst': chat.legacy_id,
'message': text,
'content_type': 'json',
}) as response:
json = await response.json()
if json['status'] != 'success':
raise XMPPError("Unable to send")