135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
"""
|
|
User actions
|
|
"""
|
|
|
|
import aiohttp
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from pytz import timezone
|
|
from typing import TYPE_CHECKING, Optional, Union
|
|
from slidge import BaseSession, GatewayUser, LegacyContact, LegacyRoster
|
|
|
|
class Contact(LegacyContact[str]):
|
|
session: "Session"
|
|
|
|
class Roster(LegacyRoster[str, "Contact"]):
|
|
session: "Session"
|
|
|
|
async def jid_username_to_legacy_id(self, jid_username: str) -> str:
|
|
if len(jid_username) != 10: # TODO more in depth validation
|
|
raise XMPPError("bad-request", "This is not a valid 10 digit phone number")
|
|
return jid_username
|
|
|
|
API_URL = 'https://voip.ms/api/v1/rest.php'
|
|
EASTERN_TIME = timezone('America/New_York')
|
|
ASSETS_DIR = Path(__file__).parent / "assets"
|
|
|
|
#@dataclass
|
|
#class VoipMsSms:
|
|
# id: str
|
|
# date: str
|
|
# type: VoipMsSmsType
|
|
# did: str
|
|
# contact: str
|
|
# message: str
|
|
# col_media1: str
|
|
|
|
class Session(BaseSession[str, Contact]):
|
|
def __init__(self, user: GatewayUser):
|
|
self.httpsession = aiohttp.ClientSession()
|
|
super().__init__(user)
|
|
|
|
def shutdown(self):
|
|
super().shutdown()
|
|
self.httpsession.close()
|
|
|
|
async def login(self):
|
|
f = self.user.registration_form
|
|
async with self.httpsession.get(API_URL, params={
|
|
'api_username': f['username'],
|
|
'api_password': f['password'],
|
|
'method': 'getDIDsInfo',
|
|
'did': f['did'],
|
|
'content_type': 'json',
|
|
}) as response:
|
|
json = await response.json()
|
|
if json['status'] == 'success':
|
|
return f"Connected as {json['dids'][0]['did']}"
|
|
else:
|
|
return f"Failure! {json['status']}"
|
|
|
|
async def poll_loop():
|
|
while True:
|
|
last_15s = datetime.now() - timedelta(seconds=15)
|
|
messages = await get_messages(last_15s)
|
|
|
|
for message in messages:
|
|
if 'col_media1' in message:
|
|
self.xmpp.send_file(file_url=message['col1_media'], legacy_message_id=message['id'])
|
|
elif 'message' in message:
|
|
self.xmpp.send_text(message['message'], legacy_msg_id=message['id'])
|
|
await asyncio.sleep(15)
|
|
|
|
|
|
# See this issue for timezone explanation https://github.com/michaelkourlas/voipms-sms-client/issues/35
|
|
async def get_messages(self, from_time: datetime):
|
|
f = self.user.registration_form
|
|
async with self.httpsession.get(API_URL, params={
|
|
'api_username': f['username'],
|
|
'api_password': f['password'],
|
|
'method': 'getMMS',
|
|
'did': f['did'],
|
|
'from': from_time.astimezone(EASTERN_TIME).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'timezone': -5,
|
|
'type': 1,
|
|
'all_messages': 1,
|
|
'content_type': 'json',
|
|
}) as response:
|
|
json = await response.json()
|
|
|
|
if json['status'] != 'success':
|
|
return []
|
|
else:
|
|
return json['sms']
|
|
|
|
# SMS doesn't care about presence
|
|
async def on_presence(resource: str, show, status: str, resources, merged_resource):
|
|
pass
|
|
|
|
async def on_file(self, chat: Contact, url: str, **_kwargs):
|
|
f = self.user.registration_form
|
|
async with self.httpsession.get(API_URL, params={
|
|
'api_username': f['username'],
|
|
'api_password': f['password'],
|
|
'method': 'sendMMS',
|
|
'did': f['did'],
|
|
'dst': chat.legacy_id,
|
|
'media1': url,
|
|
'content_type': 'json',
|
|
}) as response:
|
|
json = await response.json()
|
|
|
|
if json['status'] != 'success':
|
|
raise XMPPError("Unable to send")
|
|
|
|
async def on_text(
|
|
self,
|
|
chat: Contact,
|
|
text: str,
|
|
**_kwargs
|
|
):
|
|
f = self.user.registration_form
|
|
async with self.httpsession.get(API_URL, params={
|
|
'api_username': f['username'],
|
|
'api_password': f['password'],
|
|
'method': 'sendSMS', #TODO support MMS
|
|
'did': f['did'],
|
|
'dst': chat.legacy_id,
|
|
'message': text,
|
|
'content_type': 'json',
|
|
}) as response:
|
|
json = await response.json()
|
|
|
|
if json['status'] != 'success':
|
|
raise XMPPError("Unable to send") |