Files
ArXtic/src/arxtic.py
Moussouni, Yaël 9f7a5ef1fb 2025-10-07: Update
2025-10-07 09:54:38 +02:00

199 lines
6.5 KiB
Python

#!/usr/bin/env python
#[TLP:AMBER] LIMITED DISTRIBUTION: WORK IN PROGRESS
"""
ArXtic:
ArXtic queries arXiv and filters the output.
@ Author: Moussouni, Yaël (MSc student; yael.moussouni@etu.unistra.fr)
@ Institution: Université de Strasbourg, CNRS, Observatoire astronomique
de Strasbourg, UMR 7550, F-67000 Strasbourg, France
@ Date: 2025-09-15
Licence:
ArXtic
Copyright (C) 2025 Yaël Moussouni (yael.moussouni@etu.unistra.fr)
arxtic.py
Copyright (C) 2025 Yaël Moussouni (yael.moussouni@etu.unistra.fr)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see www.gnu.org/licenses/.
"""
import os
import textwrap as tw
import feedparser as fp
FILTERS_DIR = os.environ.get("FILTERS_DIR")
DB_DIR = os.environ.get("DB_DIR")
ARXIV_QUERY_URL = os.environ.get("ARXIV_QUERY_URL")
ARXIV_RSS_URL = os.environ.get("ARXIV_RSS_URL")
ADSABS_QUERY_URL = os.environ.get("ADSABS_QUERY_URL")
COLOUR_DEFAULT="\033[0m"
COLOUR_INPUT="\033[36m"
COLOUR_OUTPUT="\033[32m"
COLOUR_INFO="\033[34m"
COLOUR_WARNING="\033[93m"
COLOUR_ERROR="\033[91m"
## General
def wrap(txt, length=80):
wrapped_txt = '\n'.join(tw.wrap(txt, length, break_long_words=False))
return wrapped_txt
def get_entries(rss):
entries = rss["entries"]
return entries
def get_filters():
filters = []
filters_list = [f for f in os.listdir(FILTERS_DIR) if not f[0] == "."]
for i in range(len(filters_list)):
path = FILTERS_DIR + filters_list[i]
with open(path) as filter_file:
dic = {"fields": [], "values": []}
for line in filter_file.readlines():
if "#FIELD" in line:
field = line.split("=")[1].replace("\"", "").strip()
dic["fields"].append(field)
elif line[0] == "#" or line in [" \n", "\n", ""]:
continue
else:
value = line.replace("\n", "")
dic["values"].append(value)
filters.append(dic)
return filters
## ArXiV Entries
def filter_entries(filters, entries):
filtered_entries = []
filtered_fields = []
filtered_keywords = []
for entry in entries:
added = False
for filter_ in filters:
fields = filter_["fields"]
values = filter_["values"]
for field in fields:
for value in values:
if not added and value.upper() in str(entry[field]).upper():
filtered_entries.append(entry)
filtered_fields.append([field])
filtered_keywords.append([value])
added = True
elif added and value.upper() in str(entry[field]).upper():
if not field in filtered_fields[-1]:
filtered_fields[-1].append(field)
if not value in filtered_keywords[-1]:
filtered_keywords[-1].append(value)
return filtered_entries, filtered_fields, filtered_keywords
def print_entries(entries, fields=None, keywords=None):
for i in range(len(entries)):
entry = entries[i]
print(COLOUR_INFO, end="")
print(entry["id"], end="")
if "arxiv_announce_type" in list(entry) :
print(" (" + entry["arxiv_announce_type"] + ")", end="")
print(" [" + entry["link"] + "]", end="")
print(COLOUR_DEFAULT)
print(COLOUR_DEFAULT + wrap(entry["title"]) + COLOUR_DEFAULT)
print(COLOUR_OUTPUT
+ wrap(", ".join([a["name"] for a in entry["authors"]]))
+ COLOUR_DEFAULT)
print(COLOUR_INPUT
+ wrap("\n".join(entry["summary"].split("\n")[1:]))
+ COLOUR_DEFAULT)
if fields is not None:
print(COLOUR_ERROR
+ "Filtered field(s): "
+ ", ".join(fields[i])
+ COLOUR_DEFAULT)
if keywords is not None:
print(COLOUR_ERROR
+ "Filtered keyword(s): "
+ ", ".join(keywords[i])
+ COLOUR_DEFAULT)
print("")
return 0
# ArXiV IDs
def get_arxiv_ids(entries):
ids = []
for entry in entries:
ids.append(entry["id"])
return ids
def save_arxiv_ids(ids, library="saved"):
if isinstance(ids, list) or isinstance(ids, np.ndarray):
ids = [i.replace("oai:", "").replace("arXiv.org:", "") for i in ids]
elif isinstance(ids, str):
ids = [ids.replace("oai:", "").replace("arXiv.org:", "")]
else:
raise Exception(
"The type of ids ({}) is not recognized".format(type(ids))
)
with open(DB_DIR + library + ".txt", "a+") as db_file:
None # creates the file if not already in the directory
with open(DB_DIR + library + ".txt", "r+") as db_file:
known_ids = [line.replace("\n", "") for line in db_file.readlines()]
with open(DB_DIR + library + ".txt", "a+") as db_file:
for i in ids:
if not i in known_ids:
db_file.write(i)
db_file.write("\n")
return 0
## ArXiV
def get_arxiv_rss():
feed = fp.parse(ARXIV_RSS_URL)
return feed
def today_arxiv():
filters = get_filters()
feed = get_arxiv_rss()
entries = get_entries(feed)
entries, fields, keywords = filter_entries(filters, entries)
ids = get_arxiv_ids(entries)
save_arxiv_ids(ids)
print_entries(entries, fields, keywords)
return entries, fields, keywords
def get_arxiv_from_ids(ids):
if isinstance(ids, list) or isinstance(ids, np.ndarray):
ids = [i.replace("oai:", "").replace("arXiv.org:", "") for i in ids]
elif isinstance(ids, str):
ids = [ids.replace("oai:", "").replace("arXiv.org:", "")]
else:
raise Exception(
"The type of ids ({}) is not recognized".format(type(ids))
)
query = ARXIV_QUERY_URL + "id_list=" + ",".join(ids)
feed = fp.parse(query)
return feed
## ADS-ABS
def get_adsabs_from_ids(ids):
return None
entries, fields, keywords = today_arxiv()