mirror of
https://codeberg.org/Yael-II/Astrobs-Tools.git
synced 2026-03-15 03:16:27 +01:00
Some fresh air
This commit is contained in:
53
Source/ASTROBS_SELECT_v1.py
Normal file
53
Source/ASTROBS_SELECT_v1.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import SCOPE_v2_2
|
||||
import EQUATOR_v1_1
|
||||
|
||||
QUIT = ["QUIT", "EXIT", "Q"]
|
||||
DONE = ["DONE", "OK"]
|
||||
SCOPE = ["SCOPE"]
|
||||
EQUATOR = ["EQUATOR"]
|
||||
|
||||
stop = False
|
||||
first = True
|
||||
|
||||
|
||||
|
||||
while not stop:
|
||||
if first:
|
||||
print("\033[35m"
|
||||
+ "{:^80}".format("=== [ ASTROBS TOOLS ] ===")
|
||||
+"\033[0m")
|
||||
first = False
|
||||
print("\033[32m"
|
||||
+ "Please select a tool:"
|
||||
+ "\033[0m")
|
||||
print("\033[32m"
|
||||
+ "\tSCOPE: "
|
||||
+ "\033[0m"
|
||||
+ "Sky Coordinates for Observations Python Estimator")
|
||||
print("\033[32m"
|
||||
+ "\tEQUATOR: "
|
||||
+ "\033[0m"
|
||||
+ "Equator Queries simbAd to create Tables of Objects")
|
||||
choice = input("\033[32m" + "Choice: " + "\033[0m")
|
||||
|
||||
if choice.upper() in DONE + ["", " "]:
|
||||
None
|
||||
elif choice.upper() in SCOPE:
|
||||
print("\033[35m"
|
||||
+ "{:^80}".format("=== [ SCOPE ] ===")
|
||||
+"\033[0m")
|
||||
first = True
|
||||
SCOPE_v2_2.main()
|
||||
elif choice.upper() in EQUATOR:
|
||||
print("\033[35m"
|
||||
+ "{:^80}".format("=== [ EQUATOR ] ===")
|
||||
+"\033[0m")
|
||||
first = True
|
||||
EQUATOR_v1_1.main()
|
||||
elif choice.upper() in QUIT:
|
||||
stop = True
|
||||
else:
|
||||
print("\033[93m"
|
||||
+ "Warning: {} is not recognized".format(choice)
|
||||
+ "\033[0m")
|
||||
|
||||
57
Source/EQUATOR_v1_1.py
Normal file
57
Source/EQUATOR_v1_1.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""
|
||||
* EQUATOR: Equator Queries simbAd to create Tables of Objects
|
||||
* Version 1 - 2024-09-21
|
||||
@ Yaël Moussouni
|
||||
@ Observatory of Strasbourg
|
||||
"""
|
||||
import astrobs_v1_toolbox as toolbox
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
YES = ["YES", "Y", "1"]
|
||||
QUIT = ["QUIT", "EXIT", "Q"]
|
||||
OUT_DIR = "./Output/"
|
||||
|
||||
|
||||
# TODO objects --> targets!
|
||||
# TODO https://astroquery.readthedocs.io/en/latest/imcce/imcce.html#miriade-ephemeris-service
|
||||
|
||||
def main():
|
||||
stop = False
|
||||
targets = toolbox.create_table()
|
||||
try:
|
||||
files = os.listdir(OUT_DIR)
|
||||
files = np.sort([file[:-4] for file in files if file[-4:] == ".cfg"])
|
||||
print("\033[32m"
|
||||
+ "Please select a configurations:"
|
||||
+ "\033[0m")
|
||||
for i in range(len(files)):
|
||||
print("\033[32m"
|
||||
+ "\t{}. ".format(i+1)
|
||||
+ "\033[0m"
|
||||
+ files[i])
|
||||
selection = int(input("\033[32m"
|
||||
+ "Choice: "
|
||||
+ "\033[0m"))-1
|
||||
if selection > len(files):
|
||||
selection = -1
|
||||
print("\033[93m"
|
||||
+ ("Warning: Configuration unavailable, "
|
||||
"selected the last one instead.")
|
||||
+ "\033[0m")
|
||||
config_name = files[selection]
|
||||
config = toolbox.read_cfg(config_name)
|
||||
except Exception as e:
|
||||
config = toolbox.read_cfg(" ")
|
||||
while not stop:
|
||||
ans = input("\033[32m"
|
||||
+ "Select an operation: "
|
||||
+ "\033[0m")
|
||||
if ans.upper() in QUIT:
|
||||
stop = True
|
||||
toolbox.resolve_input(ans, targets, config)
|
||||
targets.pprint()
|
||||
return 0
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
407
Source/SCOPE_v2_2.py
Executable file
407
Source/SCOPE_v2_2.py
Executable file
@@ -0,0 +1,407 @@
|
||||
"""
|
||||
* SCOPE: Sky Coordinates for Observations Python Estimator
|
||||
* Version 2.1 - 2024-09-21
|
||||
@ Yaël Moussouni
|
||||
@ Observatory of Strasbourg
|
||||
"""
|
||||
import astropy.time as time
|
||||
import astropy.coordinates as coord
|
||||
import astropy.units as u
|
||||
|
||||
YES = ["YES", "Y", "1"]
|
||||
OUT_DIR = "./Output/"
|
||||
# Definitions
|
||||
def get_sun(sun_time, sun_frame, angle, reverse=False):
|
||||
i = 0
|
||||
while coord.get_sun(sun_time).transform_to(sun_frame).alt.value > angle:
|
||||
if reverse:
|
||||
sun_time -= 1 * u.min
|
||||
else:
|
||||
sun_time += 1 * u.min
|
||||
i += 1
|
||||
if i > 60*24:
|
||||
print("\033[93m"
|
||||
+ "Warning: Sun did not reached the expected angle."
|
||||
+ "\033[0m")
|
||||
return time.Time("1970-01-01T12:00") # Error date
|
||||
return sun_time
|
||||
|
||||
# Locations
|
||||
available_obs = ["Observatoire astronomique de Strasbourg (ObAS)",
|
||||
"Observatoire de Haute-Provence (OHP)"]
|
||||
available_obs_short = ["ObAS",
|
||||
"OHP"]
|
||||
available_obs_lat = [coord.Latitude("""48d34m59s"""),
|
||||
coord.Latitude("""43d55m53s""")]
|
||||
available_obs_lon = [coord.Longitude("""07d46m07s"""),
|
||||
coord.Longitude("""5d42m45s""")]
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
print("\033[32m"+ "Please select a location:"+"\033[0m")
|
||||
for i in range(len(available_obs)):
|
||||
print("\033[32m"+ "\t{}. ".format(i+1)+"\033[0m"+available_obs[i]+".")
|
||||
selection = int(input("\033[32m"+ "Choice: "+"\033[0m"))-1
|
||||
if selection >= len(available_obs):
|
||||
selection = -1
|
||||
print("\033[93m"
|
||||
+ ("Warning: Location unavailable, "
|
||||
"selected the last one instead.")
|
||||
+ "\033[0m")
|
||||
except:
|
||||
selection = -1
|
||||
print("\033[91m"
|
||||
+ ("Error! Input value not recognized, "
|
||||
"selected the last location instead")
|
||||
+ "\033[0m")
|
||||
|
||||
obs_name = available_obs[selection]
|
||||
obs_short = available_obs_short[selection]
|
||||
obs_lat = available_obs_lat[selection]
|
||||
obs_lon = available_obs_lon[selection]
|
||||
obs_loc = coord.EarthLocation(lon=obs_lon, lat=obs_lat)
|
||||
|
||||
# Date and time
|
||||
try:
|
||||
obs_date = input("\033[32m"
|
||||
+ "Date of observation [YYYY-MM-DD]: "
|
||||
+ "\033[0m")
|
||||
test_date = time.Time(obs_date)
|
||||
except:
|
||||
obs_date = time.Time("1970-01-01", format="iso").now()
|
||||
obs_date = obs_date.to_string()[0:10]
|
||||
print("\033[91m"
|
||||
+ "Error! Input value not recognized, selected today instead"
|
||||
+ "\033[0m")
|
||||
else:
|
||||
del test_date
|
||||
print("\033[34m" + "Computing Sun position..." + "\033[0m")
|
||||
sun_frame = coord.AltAz(location=obs_loc)
|
||||
sun_time = time.Time(obs_date + " 12:00:00", scale="utc")
|
||||
|
||||
sun_set = get_sun(sun_time, sun_frame, 0)
|
||||
sun_civil = get_sun(sun_set, sun_frame, -6)
|
||||
sun_nautical = get_sun(sun_civil, sun_frame, -12)
|
||||
sun_astronomical = get_sun(sun_nautical, sun_frame, -18)
|
||||
|
||||
sun_time = time.Time(obs_date + " 12:00:00", scale="utc") + 1 * u.day
|
||||
|
||||
sun_rise = get_sun(sun_time, sun_frame, 0, reverse=True)
|
||||
sun_rcivil = get_sun(sun_rise, sun_frame, -6, reverse=True)
|
||||
sun_rnautical = get_sun(sun_rcivil, sun_frame, -12, reverse=True)
|
||||
sun_rastronomical = get_sun(sun_rnautical, sun_frame, -18, reverse=True)
|
||||
print("\033[34m" + "done!" + "\033[0m")
|
||||
|
||||
print("\033[36m"
|
||||
+ "Night time [UTC]:"
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\t === Evening === "
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\t sunset: "
|
||||
+ "\033[0m"
|
||||
+sun_set.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t civil twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_civil.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t nautical twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_nautical.to_string())
|
||||
print("\033[36m"
|
||||
+ "\tastronomical twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_astronomical.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t === Morning === "
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\tastronomical twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_rastronomical.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t nautical twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_rnautical.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t civil twilight: "
|
||||
+ "\033[0m"
|
||||
+sun_rcivil.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t sunrise: "
|
||||
+ "\033[0m"
|
||||
+sun_rise.to_string())
|
||||
|
||||
|
||||
obs_begin = input("\033[32m"
|
||||
+ "Begin time of observation [hh:mm format; UTC]: "
|
||||
+ "\033[0m")
|
||||
obs_end = input("\033[32m"
|
||||
+ " End time of observation [hh:mm format; UTC]: "
|
||||
+ "\033[0m")
|
||||
|
||||
if obs_begin == "set" or obs_begin == "rise":
|
||||
obs_begin_time = sun_set
|
||||
elif obs_begin == "civil":
|
||||
obs_begin_time = sun_civil
|
||||
elif obs_begin == "nautical":
|
||||
obs_begin_time = sun_nautical
|
||||
elif obs_begin == "astronomical" or obs_begin == "auto" or obs_begin == "":
|
||||
obs_begin_time = sun_astronomical
|
||||
else:
|
||||
try:
|
||||
obs_begin_time = time.Time(obs_date + " " + obs_begin, scale="utc")
|
||||
except:
|
||||
obs_begin_time = sun_astronomical
|
||||
print("\033[91m"
|
||||
+ ("Error! Input value not recognized, "
|
||||
"selected astronomical twilight instead.")
|
||||
+ "\033[0m")
|
||||
|
||||
|
||||
if obs_end == "set" or obs_end == "rise":
|
||||
obs_end_time = sun_rise
|
||||
elif obs_end == "civil":
|
||||
obs_end_time = sun_rcivil
|
||||
elif obs_end == "nautical":
|
||||
obs_end_time = sun_rnautical
|
||||
elif obs_end == "astronomical" or obs_end == "auto" or obs_end == "":
|
||||
obs_end_time = sun_rastronomical
|
||||
else:
|
||||
try:
|
||||
obs_end_time = time.Time(obs_date + " " + obs_end, scale="utc")
|
||||
except:
|
||||
obs_end_time = sun_rastronomical
|
||||
print("\033[91m"
|
||||
+ ("Error! Input value not recognized, "
|
||||
"selected astronomical twilight instead.")
|
||||
+ "\033[0m")
|
||||
|
||||
if obs_end_time.value < obs_begin_time.value:
|
||||
obs_end_time += 1*u.day
|
||||
|
||||
obs_duration = obs_end_time - obs_begin_time
|
||||
|
||||
# Coordinates
|
||||
|
||||
print("\033[36m"
|
||||
+ "Observation constraints"
|
||||
+ "\033[0m")
|
||||
try:
|
||||
min_alt = coord.Angle(
|
||||
input("\033[32m"
|
||||
+ " Minimum altitude above horizon [deg]: "
|
||||
+ "\033[0m")
|
||||
+ "d")
|
||||
except:
|
||||
min_alt = coord.Angle(50 * u.deg)
|
||||
print("\033[91m"
|
||||
+ "Error! Input value not recognized, selected 50° instead."
|
||||
+ "\033[0m")
|
||||
try:
|
||||
min_dec = coord.Angle(
|
||||
input("\033[32m"
|
||||
+ " Minimum declination around the north [deg]: "
|
||||
+ "\033[0m")
|
||||
+ "d")
|
||||
except:
|
||||
min_dec = coord.Angle(30 * u.deg)
|
||||
print("\033[91m"
|
||||
+ "Error! Input value not recognized, selected 30° instead."
|
||||
+ "\033[0m")
|
||||
try:
|
||||
window_east = coord.Angle(
|
||||
input("\033[32m"
|
||||
+ "Observation window before crossing meridian [h]: "
|
||||
+ "\033[0m")
|
||||
+ " hours")
|
||||
except:
|
||||
window_east = coord.Angle(4 * u.h)
|
||||
print("\033[91m"
|
||||
+ "Error! Input value not recognized, selected 4h instead."
|
||||
+ "\033[0m")
|
||||
try:
|
||||
window_west = coord.Angle(
|
||||
input("\033[32m"
|
||||
+ " Observation window after crossing meridian [h]: "
|
||||
+ "\033[0m")
|
||||
+ " hours")
|
||||
except:
|
||||
window_west = coord.Angle(2 * u.h)
|
||||
print("\033[91m"
|
||||
+ "Error! Input value not recognized, selected 2h instead."
|
||||
+ "\033[0m")
|
||||
|
||||
obs_sky_rotation = coord.Angle(
|
||||
"{} hours".format(obs_duration.to_value("h")))\
|
||||
* (1*u.sday/u.day).decompose()
|
||||
|
||||
# * Date/time/location objects
|
||||
obs = time.Time(obs_begin_time, location=obs_loc)
|
||||
|
||||
# * Local sidereal time
|
||||
st = obs.sidereal_time("apparent") # ? apparent or absolute
|
||||
|
||||
# * Borders and corners coordinates
|
||||
dec_upper = (90*u.deg-min_dec).wrap_at(360*u.deg)
|
||||
dec_lower = (min_alt + obs_loc.lat - 90*u.deg).wrap_at(180*u.deg)
|
||||
a_west = (st + obs_sky_rotation + window_west).wrap_at(24*u.h)
|
||||
a_east = (st - window_east).wrap_at(24*u.h)
|
||||
|
||||
if a_east.degree <= a_west.degree:
|
||||
comp_symb = "&&"
|
||||
comp_text = "AND"
|
||||
else:
|
||||
comp_symb = "||"
|
||||
comp_text = "OR"
|
||||
# * Output
|
||||
print("")
|
||||
print("\033[36m"
|
||||
+ "Location: "
|
||||
+ "\033[0m"
|
||||
+ obs_name)
|
||||
print("\033[36m"
|
||||
+ "\tlat: "
|
||||
+ "\033[0m"
|
||||
+ obs_lat.to_string())
|
||||
print("\033[36m"
|
||||
+ "\tlon: "
|
||||
+ "\033[0m"
|
||||
+ obs_lon.to_string())
|
||||
print("\033[36m"
|
||||
+ "Observation"
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\tbegin date and time: "
|
||||
+ "\033[0m"
|
||||
+ obs_begin_time.to_string())
|
||||
print("\033[36m"
|
||||
+ "\t end date and time: "
|
||||
+ "\033[0m"
|
||||
+ obs_end_time.to_string())
|
||||
print("\033[36m"
|
||||
+ "\tbegin sidereal time: "
|
||||
+ "\033[0m"
|
||||
+ st.to_string(unit=u.hour))
|
||||
print("\033[36m"
|
||||
+ "\t end sidereal time: "
|
||||
+ "\033[0m"
|
||||
+ (st + obs_sky_rotation).wrap_at(24*u.h).to_string(unit=u.hour))
|
||||
print("\033[36m"
|
||||
+ "\t sky rotation: "
|
||||
+ "\033[0m"
|
||||
+ obs_sky_rotation.to_string(unit=u.hour))
|
||||
print("\033[36m"
|
||||
+ "Sky coordinates window"
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\t east ra: "
|
||||
+ "\033[0m"
|
||||
+ a_east.to_string(unit=u.hour))
|
||||
print("\033[36m"
|
||||
+ "\t west ra: "
|
||||
+ "\033[0m"
|
||||
+ a_west.to_string(unit=u.hour))
|
||||
print("\033[36m"
|
||||
+ "\tupper dec: "
|
||||
+ "\033[0m"
|
||||
+ dec_upper.to_string(unit=u.degree))
|
||||
print("\033[36m"
|
||||
+ "\tlower dec: "
|
||||
+ "\033[0m"
|
||||
+ dec_lower.to_string(unit=u.degree))
|
||||
print("\033[36m"
|
||||
+ "Simbad query constraints: "
|
||||
+ "\033[0m"
|
||||
+ "WHERE (ra < {} {} ra > {}) AND (dec < {} AND dec > {}))".format(
|
||||
a_west.degree,
|
||||
comp_text,
|
||||
a_east.degree,
|
||||
dec_upper.degree,
|
||||
dec_lower.degree))
|
||||
print("\033[36m"
|
||||
+ "Vizier query constraints:"
|
||||
+ "\033[0m")
|
||||
print("\033[36m"
|
||||
+ "\tRA: "
|
||||
+ "\033[0m"
|
||||
+ "< "
|
||||
+ a_west.to_string(unit=u.hour, sep=":")
|
||||
+ " {} ".format(comp_symb)
|
||||
+ "> "
|
||||
+ a_east.to_string(unit=u.hour, sep=":"))
|
||||
print("\033[36m"
|
||||
+ "\tDE: "
|
||||
+ "\033[0m"
|
||||
+ "< "
|
||||
+ dec_upper.to_string(unit=u.degree, sep=":")
|
||||
+ " && "
|
||||
+ "> "
|
||||
+ dec_lower.to_string(unit=u.degree, sep=":"))
|
||||
|
||||
print("")
|
||||
print("\033[32m"
|
||||
+ "Write output to file ? [yes/no]"
|
||||
+ "\033[0m")
|
||||
write_output = input("\033[32m"
|
||||
+ "Choice: "
|
||||
+ "\033[0m").upper() in YES
|
||||
if write_output:
|
||||
try:
|
||||
with open("{}{}_{}.cfg".format(OUT_DIR, obs_date, obs_short),
|
||||
"w+") as file:
|
||||
file.write("# File generated with SCOPE_v2\n")
|
||||
file.write("LOCATION: ")
|
||||
file.write(obs_short + "\n")
|
||||
file.write("LAT: ")
|
||||
file.write(obs_lat.to_string() + "\n")
|
||||
file.write("LON: ")
|
||||
file.write(obs_lon.to_string() + "\n")
|
||||
file.write("SUN_SET: ")
|
||||
file.write(sun_set.to_string() + "\n")
|
||||
file.write("SUN_SET_CIVIL: ")
|
||||
file.write(sun_civil.to_string() + "\n")
|
||||
file.write("SUN_SET_NAUTICAL: ")
|
||||
file.write(sun_nautical.to_string() + "\n")
|
||||
file.write("SUN_SET_ASTRONOMICAL: ")
|
||||
file.write(sun_astronomical.to_string() + "\n")
|
||||
file.write("SUN_RISE_ASTRONOMICAL: ")
|
||||
file.write(sun_rastronomical.to_string() + "\n")
|
||||
file.write("SUN_RISE_NAUTICAL: ")
|
||||
file.write(sun_rnautical.to_string() + "\n")
|
||||
file.write("SUN_RISE_CIVIL: ")
|
||||
file.write(sun_rcivil.to_string() + "\n")
|
||||
file.write("SUN_RISE: ")
|
||||
file.write(sun_rise.to_string() + "\n")
|
||||
file.write("OBS_BEGIN: ")
|
||||
file.write(obs_begin_time.to_string() + "\n")
|
||||
file.write("OBS_END: ")
|
||||
file.write(obs_end_time.to_string() + "\n")
|
||||
file.write("ST_BEGIN: ")
|
||||
file.write(st.to_string(unit=u.hour) + "\n")
|
||||
file.write("ST_END: ")
|
||||
file.write((st + obs_sky_rotation).wrap_at(
|
||||
24*u.h).to_string(unit=u.hour) + "\n")
|
||||
file.write("WINDOW_EAST: ")
|
||||
file.write(a_east.to_string(unit=u.hour) + "\n")
|
||||
file.write("WINDOW_WEST: ")
|
||||
file.write(a_west.to_string(unit=u.hour) + "\n")
|
||||
file.write("WINDOW_UPPER: ")
|
||||
file.write(dec_upper.to_string(unit=u.degree) + "\n")
|
||||
file.write("WINDOW_LOWER: ")
|
||||
file.write(dec_lower.to_string(unit=u.degree) + "\n")
|
||||
print("\033[34m"
|
||||
+ "File saved in the {} directory ".format(OUT_DIR)
|
||||
+ "with the name {}_{}.cfg".format(obs_date, obs_short)
|
||||
+ "\033[0m")
|
||||
print("\033[34m" + "done!" + "\033[0m")
|
||||
except:
|
||||
print("\033[91m"
|
||||
+ "Error! File cannot be save in the output directory."
|
||||
+ "\033[0m")
|
||||
return 0
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
617
Source/astrobs_v1_toolbox.py
Normal file
617
Source/astrobs_v1_toolbox.py
Normal file
@@ -0,0 +1,617 @@
|
||||
"""
|
||||
* Astrobs Toolbox
|
||||
* Version 1 - 2024-09-21
|
||||
@ Yaël Moussouni
|
||||
@ Observatory of Strasbourg
|
||||
"""
|
||||
import numpy as np
|
||||
import astropy.time as time
|
||||
import astropy.coordinates as coord
|
||||
import astropy.units as u
|
||||
from astropy.table import Table
|
||||
from astroquery.simbad import Simbad
|
||||
|
||||
SPECIAL = ["CALIBRATION", "IGNORE"]
|
||||
YES = ["YES", "Y", "1"]
|
||||
ALL = ["ALL", "*"]
|
||||
DONE = ["DONE", "OK"]
|
||||
QUIT = ["QUIT", "EXIT", "Q"]
|
||||
CANCEL = ["CANCEL", "BACK"]
|
||||
WRITE = ["WRITE", "SAVE"]
|
||||
READ = ["READ", "OPEN"]
|
||||
CALIB = ["CALIB", "CALIBRATION"]
|
||||
SIMBAD = ["SIMBAD", "OBJECT"]
|
||||
REGION = ["SEARCH", "REGION"]
|
||||
ST = ["SIDEREAL", "ST"]
|
||||
SEQ = ["SEQUENCE", "SEQ"]
|
||||
CHECK = ["CHECK"]
|
||||
OUT_DIR = "./Output/"
|
||||
DEFAULT_CONFIG = """# Default config
|
||||
LOCATION: None
|
||||
LAT: 0d00m00s
|
||||
LON: 0d00m00s
|
||||
SUN_SET: 1970-01-01 12:00:00.000
|
||||
SUN_SET_CIVIL: 1970-01-01 12:00:00.000
|
||||
SUN_SET_NAUTICAL: 1970-01-01 12:00:00.000
|
||||
SUN_SET_ASTRONOMICAL: 1970-01-01 12:00:00.000
|
||||
SUN_RISE_ASTRONOMICAL: 1970-01-01 12:00:00.000
|
||||
SUN_RISE_NAUTICAL: 1970-01-01 12:00:00.000
|
||||
SUN_RISE_CIVIL: 1970-01-01 12:00:00.000
|
||||
SUN_RISE: 1970-01-01 12:00:00.000
|
||||
OBS_BEGIN: 1970-01-01 12:00:00.000
|
||||
OBS_END: 1970-01-01 12:00:00.000
|
||||
ST_BEGIN: 12h00m0s
|
||||
ST_END: 12h00m00s
|
||||
WINDOW_EAST: 0h00m00s
|
||||
WINDOW_WEST: 23h59m59s
|
||||
WINDOW_UPPER: 90d00m00s
|
||||
WINDOW_LOWER: -90d00m00s
|
||||
"""
|
||||
|
||||
COLS = ["seq",
|
||||
"name",
|
||||
"main_id",
|
||||
"ra",
|
||||
"dec",
|
||||
"n_exp",
|
||||
"t_exp",
|
||||
"st_begin",
|
||||
"st_end",
|
||||
"notes"]
|
||||
|
||||
UNITS = ["", # SEQ
|
||||
"", # NAME
|
||||
"", # MAIN_ID
|
||||
"h", # RA
|
||||
"deg", # DEC
|
||||
"", # N_EXP
|
||||
"s", # T_EXP
|
||||
"h", # ST_BEGIN
|
||||
"deg", # ST_END
|
||||
""]
|
||||
|
||||
TYPES = [np.int_, # SEQ
|
||||
np.str_("<U64"), # NAME
|
||||
np.str_("<U64"), # MAIN_ID
|
||||
coord.angles.core.Angle, # RA
|
||||
coord.angles.core.Angle, # DEC
|
||||
np.int_, # N_EXP
|
||||
u.quantity.Quantity, # T_EXP
|
||||
coord.angles.core.Angle, # ST_BEGIN
|
||||
coord.angles.core.Angle, # ST_END
|
||||
np.str_("<U512")]
|
||||
|
||||
def check_window(table: Table,
|
||||
config: dict,
|
||||
i: np.int_ = None) -> np.bool_:
|
||||
def check_operation_(line, config):
|
||||
if line["main_id"] in SPECIAL:
|
||||
return True
|
||||
if line["ra"] == "":
|
||||
return True
|
||||
if line["dec"] == "":
|
||||
return True
|
||||
ra = coord.Angle(line["ra"])
|
||||
dec = coord.Angle(line["dec"])
|
||||
east = coord.Angle(config["WINDOW_EAST"])
|
||||
west = coord.Angle(config["WINDOW_WEST"])
|
||||
upper = coord.Angle(config["WINDOW_UPPER"])
|
||||
lower = coord.Angle(config["WINDOW_LOWER"])
|
||||
if east <= west:
|
||||
if ra < east or ra > west:
|
||||
print("\033[93m"
|
||||
+("Warning: The target {} "
|
||||
"is outside the observation window.").format(
|
||||
line["name"])
|
||||
+"\033[0m")
|
||||
return False
|
||||
else:
|
||||
if ra < east and ra > west:
|
||||
print("\033[93m"
|
||||
+("Warning: The target {} "
|
||||
"is outside the observation window.").format(
|
||||
line["name"])
|
||||
+"\033[0m")
|
||||
return False
|
||||
if dec < lower or dec > upper:
|
||||
print("\033[93m"
|
||||
+("Warning: The target {} "
|
||||
"is outside the observation window.").format(
|
||||
line["name"])
|
||||
+"\033[0m")
|
||||
return False
|
||||
return True
|
||||
if i == None:
|
||||
output = True
|
||||
for i in range(len(table)):
|
||||
val = check_operation_(table[i], config)
|
||||
if val == False:
|
||||
output = False
|
||||
return output
|
||||
else:
|
||||
return check_operation_(table[i], config)
|
||||
|
||||
def create_table(cols: list = COLS,
|
||||
units: list = UNITS,
|
||||
types: list = TYPES) -> Table:
|
||||
return Table(names=cols, dtype=types, units=units)
|
||||
|
||||
def select_obj(table: Table,
|
||||
line: Table,
|
||||
config: dict) -> Table:
|
||||
table_old = table.copy()
|
||||
for i in range(len(line)):
|
||||
while line[i]["name"] in line[:i]["name"]:
|
||||
line[i]["name"] = "*"+line[i]["name"]
|
||||
line.pprint_all()
|
||||
answer = "$"
|
||||
print("\033[32m"
|
||||
+ ("Select the name of the objects "
|
||||
"you want to observe (column \"NAME\"):")
|
||||
+ "\033[0m")
|
||||
while answer.upper() not in DONE + [""]:
|
||||
answer = input("\033[32m"
|
||||
+ "> "
|
||||
+ "\033[0m")
|
||||
print("\033[A"
|
||||
+ "\033[32m"
|
||||
+ "> "
|
||||
+ "\033[0m"
|
||||
+ "\033[C"*len(answer),
|
||||
end="")
|
||||
if answer.upper() in DONE + [""]:
|
||||
print("\033[34m"
|
||||
+ " - ok"
|
||||
+ "\033[0m")
|
||||
elif answer.upper() in CANCEL or answer.upper() in QUIT:
|
||||
i = 0
|
||||
while i < len(table):
|
||||
if table[i]["name"] not in table_old["name"]:
|
||||
table.remove_row(i)
|
||||
print("remove {}".format(i))
|
||||
else: i+= 1
|
||||
print("")
|
||||
print("\033[34m"
|
||||
+ "Operation canceled"
|
||||
+ "\033[0m")
|
||||
return table
|
||||
elif answer in ALL:
|
||||
for i in range(len(line)):
|
||||
name = line[i]["name"]
|
||||
while name in table["name"]:
|
||||
name = "*" + name
|
||||
table.add_row(line[i])
|
||||
table[-1]["name"] = name
|
||||
print("\033[34m"
|
||||
+ " - ok"
|
||||
+ "\033[0m")
|
||||
|
||||
elif answer in line["name"]:
|
||||
i = np.argwhere(line["name"] == answer)[0][0]
|
||||
name = line[i]["name"]
|
||||
while name in table["name"]:
|
||||
name = "*" + name
|
||||
table.add_row(line[i])
|
||||
table[-1]["name"] = name
|
||||
print("\033[34m"
|
||||
+ " - ok"
|
||||
+ "\033[0m")
|
||||
else:
|
||||
print("\033[34m"
|
||||
+ " - not found"
|
||||
+ "\033[0m")
|
||||
return table
|
||||
|
||||
def add_manual(table: Table,
|
||||
config: dict,
|
||||
seq: np.int_ = 0,
|
||||
name: np.str_ = "",
|
||||
main_id: np.str_ = "",
|
||||
ra: coord.angles.core.Angle = None,
|
||||
dec: coord.angles.core.Angle = None,
|
||||
n_exp: np.int_ = 0,
|
||||
t_exp: u.quantity.Quantity = 0*u.s,
|
||||
obj_begin: coord.angles.core.Angle = None,
|
||||
obj_end: coord.angles.core.Angle = None,
|
||||
notes: np.str_ = "") -> Table:
|
||||
|
||||
while name in table["name"]: name = "*" + name
|
||||
while seq > 0 and seq in table["seq"]: seq += 1
|
||||
|
||||
ra_str = ""
|
||||
dec_str = ""
|
||||
obj_begin_str = ""
|
||||
obj_end_str = ""
|
||||
|
||||
if type(ra) == coord.angles.core.Angle:
|
||||
ra_str = ra.to_string(unit=u.hour)
|
||||
elif type(ra) == str:
|
||||
ra_str = ra
|
||||
|
||||
if type(dec) == coord.angles.core.Angle:
|
||||
dec_str = dec.to_string(unit=u.degree)
|
||||
elif type(dec) == str:
|
||||
dec_str = dec
|
||||
|
||||
if type(obj_begin) == coord.angles.core.Angle:
|
||||
obj_begin_str = obj_begin.to_string()
|
||||
elif type(obj_begin) == str:
|
||||
obj_begin_str = obj_begin
|
||||
|
||||
if type(obj_end) == coord.angles.core.Angle:
|
||||
obj_end_str = obj_end.to_string()
|
||||
elif type(obj_end) == str:
|
||||
obj_end_str = obj_end
|
||||
|
||||
table.add_row([seq,
|
||||
name,
|
||||
main_id,
|
||||
ra_str,
|
||||
dec_str,
|
||||
n_exp,
|
||||
t_exp.to_value(u.s),
|
||||
obj_begin_str,
|
||||
obj_end_str,
|
||||
notes])
|
||||
check_window(table, config, -1)
|
||||
return table
|
||||
|
||||
def add_calib(table: Table,
|
||||
config: dict = None,
|
||||
seq: np.int_ = 0,
|
||||
name: np.str_ = "CALIB",
|
||||
n_exp: np.int_ = 0,
|
||||
t_exp: u.quantity.Quantity = 0*u.s,
|
||||
notes: np.str_ = "") -> Table:
|
||||
|
||||
while name in table["name"]:
|
||||
name = "*" + name
|
||||
|
||||
while seq > 0 and seq in table["seq"]:
|
||||
seq += 1
|
||||
|
||||
table.add_row([seq,
|
||||
name,
|
||||
"CALIBRATION",
|
||||
"",
|
||||
"",
|
||||
n_exp,
|
||||
t_exp.to_value(u.s),
|
||||
"",
|
||||
"",
|
||||
notes])
|
||||
check_window(table, config, -1)
|
||||
return table
|
||||
|
||||
def add_simbad(table: Table,
|
||||
obj: Table,
|
||||
config: dict,
|
||||
seq: np.int_ = 0,
|
||||
name: np.str_ = "",
|
||||
n_exp: np.int_ = 0,
|
||||
t_exp: u.quantity.Quantity = 0*u.s,
|
||||
obj_begin: coord.angles.core.Angle = None,
|
||||
obj_end: coord.angles.core.Angle = None,
|
||||
notes: np.str_ = "") -> Table:
|
||||
for i in range(len(obj)):
|
||||
main_id = obj["main_id"][i]
|
||||
|
||||
if name == "":
|
||||
name = main_id
|
||||
|
||||
while name in table["name"]:
|
||||
name = "*" + name
|
||||
|
||||
while seq > 0 and seq in table["seq"]:
|
||||
seq += 1
|
||||
|
||||
ra = coord.Angle(obj["ra"][i], unit=u.degree)
|
||||
dec = coord.Angle(obj["dec"][i], unit=u.degree)
|
||||
ra_str = ra.to_string(unit=u.hour)
|
||||
dec_str = dec.to_string(unit=u.degree)
|
||||
obj_begin_str = ""
|
||||
obj_end_str = ""
|
||||
|
||||
if type(obj_begin) == coord.angles.core.Angle:
|
||||
obj_begin_str = obj_begin.to_string()
|
||||
elif type(obj_begin) == str:
|
||||
obj_begin_str = obj_begin
|
||||
|
||||
if type(obj_end) == coord.angles.core.Angle:
|
||||
obj_end_str = obj_end.to_string()
|
||||
elif type(obj_end) == str:
|
||||
obj_end_str = obj_end
|
||||
|
||||
table.add_row([seq,
|
||||
name,
|
||||
main_id,
|
||||
ra_str,
|
||||
dec_str,
|
||||
n_exp,
|
||||
t_exp.to_value(u.s),
|
||||
obj_begin_str,
|
||||
obj_end_str,
|
||||
notes])
|
||||
name = ""
|
||||
check_window(table, config)
|
||||
return table
|
||||
|
||||
def read_cfg(filename:str,
|
||||
directory:str = OUT_DIR,
|
||||
extension:str = ".cfg") -> dict:
|
||||
if directory[-1] != "/": directory += "/"
|
||||
if extension[0] != ".": extension = "." + extension
|
||||
try:
|
||||
with open(directory+filename+extension, "r") as file:
|
||||
params_index = []
|
||||
params_value = []
|
||||
for line in file.readlines():
|
||||
if len(line) > 0:
|
||||
if line[0] != "#":
|
||||
params = line.strip().split(": ")
|
||||
params_index.append(params[0])
|
||||
params_value.append(params[1])
|
||||
config = {params_index[i]:params_value[i]
|
||||
for i in range(len(params_index))}
|
||||
except:
|
||||
print("\033[93m"
|
||||
+ "Warning: No configuration file, using default."
|
||||
+ "\033[0m")
|
||||
if True:
|
||||
file = DEFAULT_CONFIG # file is a string
|
||||
params_index = []
|
||||
params_value = []
|
||||
for line in file.split("\n"):
|
||||
if len(line) > 0:
|
||||
if line[0] != "#":
|
||||
params = line.strip().split(": ")
|
||||
params_index.append(params[0])
|
||||
params_value.append(params[1])
|
||||
config = {params_index[i]:params_value[i]
|
||||
for i in range(len(params_index))}
|
||||
|
||||
|
||||
|
||||
|
||||
return config
|
||||
|
||||
def set_st_window(table, config):
|
||||
before = coord.Angle(config["ST_BEGIN"]) \
|
||||
- coord.Angle(config["WINDOW_EAST"])
|
||||
after = coord.Angle(config["WINDOW_WEST"]) \
|
||||
- coord.Angle(config["ST_END"])
|
||||
for i in range(len(table)):
|
||||
if not table[i]["main_id"] in SPECIAL:
|
||||
table[i]["st_begin"] = (coord.Angle(table[i]["ra"]) \
|
||||
- before).wrap_at(24*u.h).to_string()
|
||||
table[i]["st_end"] = (coord.Angle(table[i]["ra"]) \
|
||||
+ after).wrap_at(24*u.h).to_string()
|
||||
else:
|
||||
continue
|
||||
return table
|
||||
|
||||
def set_seq(table, config, stack=False):
|
||||
window_east = coord.Angle(config["WINDOW_EAST"]).degree
|
||||
window_west = coord.Angle(config["WINDOW_WEST"]).degree
|
||||
dupl_seq = np.argwhere(
|
||||
np.unique(table["seq"], return_counts=True)[1] > 1).flatten()
|
||||
index_dupl = np.argwhere(np.any(
|
||||
[table["seq"] == dupl_seq[i]
|
||||
for i in range(len(dupl_seq))], axis=0)).flatten()
|
||||
index_zero = np.argwhere(
|
||||
table["seq"] == 0).flatten()
|
||||
index_special = np.argwhere(np.any(
|
||||
[table["main_id"] == SPECIAL[i]
|
||||
for i in range(len(SPECIAL))], axis=0)).flatten()
|
||||
index_nora = np.argwhere(table["ra"] == "").flatten()
|
||||
index_yes = np.union1d(index_dupl, index_zero)
|
||||
index_no = np.union1d(index_special, index_nora)
|
||||
index = np.setdiff1d(index_yes, index_no)
|
||||
order_table = table[index]
|
||||
table["seq"][index] = np.full(len(index), 0)
|
||||
order_table.add_columns(
|
||||
[coord.Angle(order_table["ra"]).degree],
|
||||
names=["RA_DEG"])
|
||||
if window_east <= window_west:
|
||||
order_table.sort("RA_DEG")
|
||||
else :
|
||||
w1 = np.argwhere(
|
||||
order_table["RA_DEG"] > (window_east + window_west)/2)
|
||||
w2 = np.argwhere(
|
||||
order_table["RA_DEG"] <= (window_east + window_west)/2)
|
||||
order = np.zeros(len(order_table))
|
||||
order[w1] = 1
|
||||
order[w2] = 2
|
||||
order_table.add_columns(
|
||||
[order],
|
||||
names=["ORDER"])
|
||||
order_table.sort("RA_DEG")
|
||||
order_table.sort("ORDER")
|
||||
i = 0
|
||||
seq = 0
|
||||
while i < len(order_table):
|
||||
while seq in table["seq"]:
|
||||
seq += 1
|
||||
order_table["seq"][i] = seq
|
||||
i += 1
|
||||
seq += 1
|
||||
table[index] = order_table
|
||||
table.sort("seq")
|
||||
while table[0]["seq"] < 0:
|
||||
table.add_row(table[0])
|
||||
table.remove_row(0)
|
||||
if stack:
|
||||
i = 0
|
||||
while table[i]["seq"] >= 0:
|
||||
table[i]["seq"] = i + 1
|
||||
i += 1
|
||||
i = -1
|
||||
while table[i]["seq"] < 0:
|
||||
table[i]["seq"] = i
|
||||
i -= 1
|
||||
return table
|
||||
|
||||
def swap(table, name_1, name_2):
|
||||
if name_1 not in table["name"] or name_2 not in table["name"]:
|
||||
print("\033[93m"
|
||||
+"Warning: {} or {} cannot be found in the table.".format(
|
||||
name_1,
|
||||
name_2)
|
||||
+"\033[0m")
|
||||
return table
|
||||
i = 0
|
||||
seq_1 = 0
|
||||
seq_2 = 0
|
||||
while table[i]["name"] not in [name_1, name_2]:
|
||||
i += 1
|
||||
seq_1 = table[i]["seq"]
|
||||
table.add_row(table[i])
|
||||
j = i + 1
|
||||
while table[j]["name"] not in [name_1, name_2]:
|
||||
j += 1
|
||||
seq_2 = table[j]["seq"]
|
||||
table[i] = table[j]
|
||||
table[j] = table[-1]
|
||||
table[i]["seq"] = seq_1
|
||||
table[j]["seq"] = seq_2
|
||||
table.remove_row(-1)
|
||||
return table
|
||||
|
||||
def write_table(table,
|
||||
config,
|
||||
filename=None,
|
||||
extension=".xml",
|
||||
format_="votable",
|
||||
directory=OUT_DIR):
|
||||
data = table.copy()
|
||||
if directory[-1] != "/": directory += "/"
|
||||
if extension[0] != ".": extension = "." + extension
|
||||
if filename == None:
|
||||
filename = "{}_{}".format(config["OBS_BEGIN"][:10],
|
||||
config["LOCATION"])
|
||||
try:
|
||||
data.write(directory + filename + extension,
|
||||
format=format_,
|
||||
overwrite=True)
|
||||
print("\033[34m"
|
||||
+ "File saved in the {} directory ".format(directory)
|
||||
+ "with the name {}".format(filename+extension)
|
||||
+ "\033[0m")
|
||||
except Exception as e:
|
||||
print("\033[91m"
|
||||
+ ("Error! File cannot be saved "
|
||||
"in the {} directory").format(directory)
|
||||
+ "\033[0m")
|
||||
|
||||
def read_table(filename,
|
||||
table,
|
||||
config,
|
||||
extension=".xml",
|
||||
format_="votable",
|
||||
directory=OUT_DIR):
|
||||
if directory[-1] != "/": directory += "/"
|
||||
if extension[0] != ".": extension = "." + extension
|
||||
if filename == None:
|
||||
filename = "{}_{}".format(config["OBS_BEGIN"][:10],
|
||||
config["LOCATION"])
|
||||
try:
|
||||
data = Table.read(directory + filename + extension,
|
||||
format=format_)
|
||||
for i in range(len(data)):
|
||||
line = data[i]
|
||||
if "name" in line.columns: name = line["name"]
|
||||
else: name = "{}_{}".format(filename, i)
|
||||
if "main_id" in line.columns: main_id = line["main_id"]
|
||||
else: main_id = name
|
||||
if "ra" in line.columns:
|
||||
if "h" not in str(line["ra"]):
|
||||
ra = coord.Angle("{}d".format(line["ra"])).to_string(unit=u.hour)
|
||||
else: ra = line["ra"]
|
||||
else: ra = ""
|
||||
if "dec" in line.columns:
|
||||
if "d" not in str(line["dec"]):
|
||||
dec = coord.Angle("{}d".format(line["dec"])).to_string(unit=u.degree)
|
||||
else: dec = line["dec"]
|
||||
else: dec = ""
|
||||
if "n_exp" in line.columns: n_exp = line["n_exp"]
|
||||
else: n_exp = 0
|
||||
if "t_exp" in line.columns: t_exp = line["t_exp"]
|
||||
else: t_exp = 0.
|
||||
if "st_begin" in line.columns: st_begin = line["st_begin"]
|
||||
else: st_begin = ""
|
||||
if "st_end" in line.columns: st_end = line["st_end"]
|
||||
else: st_end = ""
|
||||
if "notes" in line.columns: notes = line["notes"]
|
||||
else: notes = ""
|
||||
while name in table["name"]:
|
||||
name = "*" + name
|
||||
|
||||
for col in line.columns:
|
||||
if col not in COLS:
|
||||
notes += " & {}: {}".format(col, line[col])
|
||||
|
||||
table.add_row({"seq": 0,
|
||||
"name": name,
|
||||
"main_id": main_id,
|
||||
"ra": ra,
|
||||
"dec": dec,
|
||||
"n_exp": n_exp,
|
||||
"t_exp": t_exp,
|
||||
"st_begin": st_begin,
|
||||
"st_end": st_end,
|
||||
"notes": notes})
|
||||
|
||||
except Exception as e:
|
||||
print("\033[91m"
|
||||
+ ("Error! File cannot be loaded "
|
||||
"from the {} directory").format(directory)
|
||||
+ "\033[0m")
|
||||
return create_table()
|
||||
return table
|
||||
|
||||
def resolve_input(text, table, config):
|
||||
swap_table = create_table()
|
||||
args = text.split(" ")
|
||||
try:
|
||||
if args[0].upper() in DONE + ["", " "]:
|
||||
None
|
||||
elif args[0].upper() in WRITE: # TODO ajouter args
|
||||
write_table(table, config)
|
||||
elif args[0].upper() in READ: # TODO ajouter args
|
||||
read_table(args[1], swap_table, config)
|
||||
select_obj(table, swap_table, config)
|
||||
elif args[0].upper() in SIMBAD:
|
||||
name = " ".join(args[1:])
|
||||
obj = Simbad.query_object(name)
|
||||
swap_table = add_simbad(swap_table, obj, config, name=name)
|
||||
select_obj(table, swap_table, config)
|
||||
elif args[0].upper() in REGION:
|
||||
name = " ".join(args[1]+" "+args[2])
|
||||
region = coord.SkyCoord(ra=args[1], dec=args[2])
|
||||
radius = coord.Angle(args[3])
|
||||
obj = Simbad.query_region(region, radius)
|
||||
swap_table = add_simbad(swap_table, obj, config)
|
||||
select_obj(table, swap_table, config)
|
||||
elif args[0].upper() in CALIB: # TODO ajouter args
|
||||
add_calib(table, config)
|
||||
elif args[0].upper() in ST:
|
||||
set_st_window(table, config)
|
||||
elif args[0].upper() in SEQ: # FIXME marche bien une fois mais pas deux ??
|
||||
set_seq(table, config)
|
||||
elif args[0].upper() in CHECK:
|
||||
check_window(table, config)
|
||||
elif args[0].upper() in QUIT:
|
||||
None
|
||||
else:
|
||||
print("\033[93m"
|
||||
+ ("Warning: {} is not recognized "
|
||||
"as a valid keyword.").format(args[0])
|
||||
+ "\033[0m")
|
||||
except Exception as e:
|
||||
print("\033[91m"
|
||||
+ "Error: Your operation is not recognized".format(args[0])
|
||||
+ "\033[0m")
|
||||
return table
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user