from os import environ
import requests
import re
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import asyncio
from nio import ClientConfig, AsyncClient, LoginResponse, InviteEvent
def get_accesstoken_from_file(accesstoken_path):
accesstoken_file = open(accesstoken_path, 'r')
single_accesstoken = accesstoken_file.read().strip()
accesstoken_file.close()
return single_accesstoken
async def on_event(room, event):
if hasattr(event, 'membership'):
if event.membership == 'invite':
# automatically join invites
print('joining '+room.room_id)
join = await matrix[event.source['state_key']].join(room.room_id)
print(join)
def get_blog():
url = 'https://news.blizzard.com/en-us/'
html = requests.get(url).text
soup = BeautifulSoup(html, 'html.parser')
base_url = 'https://news.blizzard.com'
blog = []
feature_list_html = soup.find_all(class_='FeaturedArticle-link')
for feature_html in feature_list_html:
image_html = feature_html.find(class_='Card-image')
image_url_fragment = re.findall('url\("(.*?)"\)', image_html.attrs['style'])[0]
image_url = 'https:'+image_url_fragment
text_list = feature_html.find_all(class_='text-truncate-ellipsis')
blog.append({
'image': image_url,
'game': text_list[0].contents[0].replace(' ', '').lower(),
'title': text_list[1].contents[0],
'description': '',
'url': base_url+feature_html.attrs['href'],
})
article_list_html = soup.find_all(class_='ArticleListItem')
for article_html in article_list_html:
image_html = article_html.find(class_='ArticleListItem-image')
image_url_fragment = re.findall('url\((.*?)\)', image_html.attrs['style'])[0]
image_url = 'https:'+image_url_fragment
content_html = article_html.find(class_='ArticleListItem-contentGrid')
blog.append({
'image': image_url,
'game': content_html.find(class_='ArticleListItem-subtitle').find(class_='ArticleListItem-labelInner').contents[0].replace(' ', '').lower(),
'title': content_html.find(class_='ArticleListItem-title').contents[0],
'description': content_html.find(class_='ArticleListItem-description').find(class_='h6').contents[0],
'url': base_url+article_html.find(class_='ArticleLink').attrs['href'],
})
return blog
def get_body(post):
body = post['title']+"\n"
if post['description']:
body += post['description']+"\n"
body += post['url']
return body
def get_formatted_body(post):
formatted_body = ''
formatted_body += ''+post['title']+'
'
formatted_body += ''
if post['description']:
formatted_body += '
'+post['description']+'
' return formatted_body async def main(): event_type_prefix = 'de.lubiland.snowstorm-matrix.' next_batch = {} for game in device: # initialize new client mxid = '@'+mxid_prefix+game+':'+homeserver_name config = ClientConfig(store_sync_tokens=True) matrix[mxid] = AsyncClient(homeserver_url, config=config) # login login_response = LoginResponse(mxid, device[game]['id'], device[game]['accesstoken']) await matrix[mxid].receive_response(login_response) matrix[mxid].add_event_callback(on_event, InviteEvent) # do a first sync sync_filter = { 'room': { 'state': { 'types': ['m.room.member'], 'lazy_load_members': True }, 'timeline': { 'types': ['invalid'] }, 'ephemeral': { 'types': ['invalid'] } } } next_batch_state = await matrix[mxid].room_get_state_event(room_id=admin_room, event_type=event_type_prefix+'next_batch', state_key=mxid) if 'token' in next_batch_state.content: try: sync = await matrix[mxid].sync(timeout=3000, sync_filter=sync_filter, since=next_batch_state.content['token']) next_batch[mxid] = sync.next_batch continue except: pass # when there is no next_batch token or first sync threw an error, # then do a first sync without next_batch print('doing first sync for '+mxid) sync = await matrix[mxid].sync(timeout=3000, sync_filter=sync_filter) next_batch[mxid] = sync.next_batch next_update = datetime.now() while True: if next_update < datetime.now(): blog = get_blog() for post in blog: mxid = '@'+mxid_prefix+post['game']+':'+homeserver_name if mxid in matrix: content = { 'msgtype': 'm.notice', 'body': get_body(post), 'format': 'org.matrix.custom.html', 'formatted_body': get_formatted_body(post) } for room_id in matrix[mxid].rooms: await matrix[mxid].room_send(room_id=room_id, message_type='m.room.message', content=content) else: content = { 'msgtype': 'm.notice', 'body': 'No accesstoken for '+mxid+' available.', 'formatted_body': ('Noaccesstoken
for '+
''+mxid+'
available.')
}
# send the message with the first available matrix client,
# because we will always have at least one accesstoken
await matrix[next(iter(matrix))].room_send(room_id=admin_room,
message_type='m.room.message',
content=content)
next_update = datetime.now() + timedelta(minutes=30)
next_batch_state = await matrix[mxid].room_get_state_event(room_id=admin_room,
event_type=event_type_prefix+'next_batch',
state_key=mxid)
for mxid in next_batch:
sync = await matrix[mxid].sync(timeout=10000,
sync_filter=sync_filter,
since=next_batch[mxid])
next_batch[mxid] = sync.next_batch
await matrix[mxid].room_put_state(room_id=admin_room,
event_type=event_type_prefix+'next_batch',
state_key=mxid,
content={'token': next_batch[mxid]})
homeserver_name = environ['HOMESERVER_NAME']
homeserver_url = environ['HOMESERVER_URL']
mxid_prefix = environ['MXID_PREFIX']
admin_room = environ['ADMIN_ROOM']
device = {}
for var in environ:
if (game := re.match('^DEVICEID_([A-Z]*)$', var)) is not None:
device[game[1].lower()] = {'id': environ[var]}
for var in environ:
if (game := re.match('^ACCESSTOKEN_([A-Z]*)_FILE$', var)) is not None:
device[game[1].lower()]['accesstoken'] = get_accesstoken_from_file(environ[var])
matrix = {}
asyncio.run(main())