suppressPackageStartupMessages({
library(CAVAanalytics)
library(sf)
library(readr)
library(arrow)
library(data.table)
library(jsonlite)
library(dplyr)
library(lubridate)
library(purrr)
library(tibble)
})Weather Data Retrieval
Library importation
Load the necessary packages.
Sentinel site definition
Define the sentinel sites used in the study, including country, climatic zone, geographic coordinates, and the start and end dates of the local growing season. A unique identifier is assigned to each site, and the planting/harvest dates are converted to day-of-year values for later use in phenological analyses.
sentinels <- tibble::tribble(
~country, ~zone_gradient, ~site, ~lat, ~lon, ~start_month, ~start_day, ~end_month, ~end_day,
"Brazil", "Western RS (continental)", "Santa Rosa (BR)", -27.87, -54.48, 8L, 15L, 9L, 15L,
"Brazil", "Central–North RS (reference)", "Passo Fundo (BR)", -28.26, -52.40, 9L, 15L, 10L, 15L,
"Brazil", "Eastern RS (highlands)", "Vacaria (BR)", -28.51, -50.94, 10L, 1L, 10L, 31L,
"Uruguay", "Northern Litoral", "Artigas-Salto (UY)", -30.40, -56.50, 10L, 1L, 10L, 31L,
"Argentina","Zone II (North Pampas)", "Pergamino (AR)", -33.89, -60.57, 10L, 1L, 10L, 31L,
"Argentina","Zone IV (South Pampas)", "Tandil (AR)", -37.32, -59.13, 11L, 1L, 11L, 30L
) %>%
mutate(sentinel_id = row_number()) %>%
select(sentinel_id, everything()) %>%
mutate(
start_doy = yday(make_date(2001, start_month, start_day)),
end_doy = yday(make_date(2001, end_month, end_day))
)
DT::datatable(
sentinels,
options = list(
pageLength = 10,
scrollX = TRUE
)
)Data Retrieval
Retrieve daily weather data for each sentinel site using the Open-Meteo archive API. Data are downloaded in multi-year chunks to reduce request size and improve stability, with retry logic and pauses included to handle server throttling. The extracted variables are daily minimum and maximum temperature, mean relative humidity, and precipitation, from which mean daily temperature is calculated. The results are then combined across sites and exported as a csv.
.om_fetch_daily <- function(url, max_tries = 12, base_sleep = 2) {
for (i in seq_len(max_tries)) {
out <- tryCatch(jsonlite::fromJSON(url), error = function(e) e)
if (!inherits(out, "error")) return(out)
msg <- conditionMessage(out)
if (grepl("429", msg)) {
if (i >= 5) {
# HARD cooldown after repeated 429
wait <- 180 + runif(1, 0, 30)
message(sprintf("Hard Open-Meteo throttle. Cooling down %.0f seconds...", wait))
} else {
wait <- base_sleep * (2^(i - 1)) + runif(1, 0, 1)
message(sprintf("Open-Meteo 429 (try %d/%d). Sleeping %.1f s...",
i, max_tries, wait))
}
Sys.sleep(wait)
next
}
# Other errors
wait <- 2 + runif(1)
message(sprintf("Open-Meteo error (try %d/%d): %s | Sleeping %.1f s...",
i, max_tries, msg, wait))
Sys.sleep(wait)
}
warning("Open-Meteo failed after retries — skipping this chunk.")
return(NULL)
}
get_era5_openmeteo_daily_chunked <- function(lat, lon,
start_date = as.Date("1981-01-01"),
end_date = Sys.Date() - 6,
timezone = "UTC",
chunk_years = 3,
pause_between_calls = 0.2) {
base <- "https://archive-api.open-meteo.com/v1/archive"
daily_vars <- paste(
c("temperature_2m_min", "temperature_2m_max", "relative_humidity_2m_mean", "precipitation_sum"),
collapse = ","
)
# sequência de janelas [start, end] por chunk_years
starts <- seq.Date(start_date, end_date, by = paste0(chunk_years, " years"))
parts <- vector("list", length(starts))
for (k in seq_along(starts)) {
s <- starts[[k]]
e <- min(end_date, (s %m+% years(chunk_years)) - days(1))
url <- paste0(
base,
"?latitude=", lat,
"&longitude=", lon,
"&start_date=", format(s, "%Y-%m-%d"),
"&end_date=", format(e, "%Y-%m-%d"),
"&daily=", daily_vars,
"&timezone=", timezone
)
js <- .om_fetch_daily(url)
if (is.null(js$daily$time)) {
stop("Open-Meteo returned no daily data for chunk: ", s, " to ", e)
}
parts[[k]] <- tibble(
date = as.Date(js$daily$time),
T2M_MIN = as.numeric(js$daily$temperature_2m_min),
T2M_MAX = as.numeric(js$daily$temperature_2m_max),
RH2M = as.numeric(js$daily$relative_humidity_2m_mean),
PRECTOTCORR = as.numeric(js$daily$precipitation_sum)
) %>%
mutate(T2M = (T2M_MIN + T2M_MAX) / 2) %>%
select(date, T2M, RH2M, PRECTOTCORR)
if (pause_between_calls > 0) Sys.sleep(pause_between_calls)
}
bind_rows(parts) %>%
distinct(date, .keep_all = TRUE) %>%
arrange(date)
}
download_all_era5_openmeteo <- function(sentinels,
start_date = as.Date("1981-01-01"),
end_date = Sys.Date() - 6,
timezone = "UTC",
chunk_years = 3,
pause_between_chunks = 0.2,
pause_between_sites = 1.0,
cache_dir = NULL) {
if (!is.null(cache_dir)) dir.create(cache_dir, showWarnings = FALSE, recursive = TRUE)
purrr::pmap_dfr(
list(sentinels$sentinel_id, sentinels$country, sentinels$site,
sentinels$lat, sentinels$lon),
function(sentinel_id, country, site, lat, lon) {
message(sprintf("Open-Meteo/ERA5: %s | %s (%.2f, %.2f)", country, site, lat, lon))
cache_file <- NULL
if (!is.null(cache_dir)) {
cache_file <- file.path(cache_dir, sprintf("openmeteo_era5_%02d_%s_%s.rds",
sentinel_id,
gsub("[^A-Za-z0-9]+","_", country),
gsub("[^A-Za-z0-9]+","_", site)))
}
if (!is.null(cache_file) && file.exists(cache_file)) {
dat <- readRDS(cache_file)
} else {
dat <- get_era5_openmeteo_daily_chunked(
lat = lat, lon = lon,
start_date = start_date,
end_date = end_date,
timezone = timezone,
chunk_years = chunk_years,
pause_between_calls = pause_between_chunks
)
if (!is.null(cache_file)) saveRDS(dat, cache_file)
}
if (pause_between_sites > 0) Sys.sleep(pause_between_sites)
dat %>%
mutate(
sentinel_id = sentinel_id,
country = country,
site = site,
lat = lat,
lon = lon,
year = lubridate::year(date),
mon = lubridate::month(date)
)
}
)
}
download_all_era5_openmeteo <- function(sentinels,
start_date = as.Date("1981-01-01"),
end_date = Sys.Date() - 6,
timezone = "UTC",
chunk_years = 3,
pause_between_chunks = 0.2,
pause_between_sites = 1.0,
cache_dir = NULL) {
if (!is.null(cache_dir)) dir.create(cache_dir, showWarnings = FALSE, recursive = TRUE)
purrr::pmap_dfr(
list(sentinels$sentinel_id, sentinels$country, sentinels$site,
sentinels$lat, sentinels$lon),
function(sentinel_id, country, site, lat, lon) {
message(sprintf("Open-Meteo/ERA5: %s | %s (%.2f, %.2f)", country, site, lat, lon))
cache_file <- NULL
if (!is.null(cache_dir)) {
cache_file <- file.path(cache_dir, sprintf("openmeteo_era5_%02d_%s_%s.rds",
sentinel_id,
gsub("[^A-Za-z0-9]+","_", country),
gsub("[^A-Za-z0-9]+","_", site)))
}
if (!is.null(cache_file) && file.exists(cache_file)) {
dat <- readRDS(cache_file)
} else {
dat <- get_era5_openmeteo_daily_chunked(
lat = lat, lon = lon,
start_date = start_date,
end_date = end_date,
timezone = timezone,
chunk_years = chunk_years,
pause_between_calls = pause_between_chunks
)
if (!is.null(cache_file)) saveRDS(dat, cache_file)
}
if (pause_between_sites > 0) Sys.sleep(pause_between_sites)
dat %>%
mutate(
sentinel_id = sentinel_id,
country = country,
site = site,
lat = lat,
lon = lon,
year = lubridate::year(date),
mon = lubridate::month(date)
)
}
)
}
df_daily <- download_all_era5_openmeteo(
sentinels,
start_date = as.Date("1981-01-01"),
end_date = Sys.Date() - 6,
timezone = "UTC",
chunk_years = 3,
pause_between_chunks = 1.5,
pause_between_sites = 8.0,
cache_dir = "cache_openmeteo"
)
write.csv(df_daily, "df_daily_era5.csv")Convert the sentinel site table into a spatial object and use it to extract bias-corrected CORDEX-CORE climate projections over the SAM-22 domain. Projections are obtained for relative humidity, daily maximum temperature, and daily minimum temperature for the period 2023–2055. Each extracted dataset is saved as an RDS file for subsequent processing.
Sites_sf <- sentinels %>%
mutate(name = case_when(
grepl("Passo Fundo", site) ~ "Passo_Fundo",
grepl("Vacaria", site) ~ "Vacaria",
grepl("Santa Rosa", site) ~ "Santa_Rosa",
grepl("Artigas", site) ~ "Artigas-Salto",
grepl("Tandil", site) ~ "Tandil",
grepl("Pergamino", site) ~ "Pergamino"
)) %>%
select(name, lon, lat) %>%
st_as_sf(coords = c("lon", "lat"), crs = 4326)
remote.data <- load_data(
path.to.data = "CORDEX-CORE-BC",
country = Sites_sf,
variable = "hurs",
years.proj = 2023:2055,
domain = "SAM-22",
buffer = 0.2
)
saveRDS(remote.data, file = "CORDEX_Data/remote_data_hurs_SAM22_2023_2055.rds")
remote.data <- load_data(
path.to.data = "CORDEX-CORE-BC",
country = Sites_sf,
variable = "tasmax",
years.proj = 2023:2055,
domain = "SAM-22",
buffer = 0.2
)
saveRDS(remote.data, file = "CORDEX_Data/remote_data_tasmax_SAM22_2023_2055.rds")
remote.data <- load_data(
path.to.data = "CORDEX-CORE-BC",
country = Sites_sf,
variable = "tasmin",
years.proj = 2023:2055,
domain = "SAM-22",
buffer = 0.2
)
saveRDS(remote.data, file = "CORDEX_Data/remote_data_tasmin_SAM22_2023_2055.rds")Extract Data
Read the saved RDS files and extract the climate arrays for each model experiment and ensemble member. The data are reshaped from a four-dimensional array into a long-format table containing member, time, latitude, longitude, and variable values. Each ensemble member is exported separately as a compressed Parquet file to facilitate efficient downstream analysis.
in_dir <- "CORDEX_Data"
out_root <- "CORDEX_Data/Parquet"
rds_files <- list.files(
in_dir,
pattern = "\\.rds$",
full.names = TRUE
)
for (rds_file in rds_files) {
rds_name <- tools::file_path_sans_ext(basename(rds_file))
cat("\n==============================\n")
cat("Processing:", rds_name, "\n")
cat("==============================\n")
my_data <- readr::read_rds(rds_file)
exp_tbl <- my_data[[1]]
for (eri in seq_len(nrow(exp_tbl))) {
exp_name <- as.character(exp_tbl$experiment[eri])
cat(sprintf("\n=== experiment: %s (%d/%d) ===\n", exp_name, eri, nrow(exp_tbl)))
mm <- exp_tbl$models_mbrs[[eri]]
arr <- mm$Data
lat_vec <- mm$xyCoords$y
lon_vec <- mm$xyCoords$x
time_vec <- mm$Dates$start
members <- mm$Members
d <- dim(arr)
n_member <- d[1]
n_time <- d[2]
n_lat <- d[3]
n_lon <- d[4]
stopifnot(
length(lat_vec) == n_lat,
length(lon_vec) == n_lon,
length(time_vec) == n_time,
length(members) == n_member
)
members_safe <- gsub("[^A-Za-z0-9_\\-]", "_", members)
for (mem_i in seq_len(n_member)) {
member_label <- members[mem_i]
member_safe <- members_safe[mem_i]
out_dir <- file.path(out_root, rds_name, exp_name)
dir.create(out_dir, recursive = TRUE, showWarnings = FALSE)
out_file <- file.path( out_dir, sprintf("%s__member_%s.parquet", rds_name, member_safe))
cat(sprintf(" writing member %d/%d → %s\n", mem_i, n_member, out_file))
chunks <- vector("list", n_time)
for (ti in seq_len(n_time)) {
mat <- arr[mem_i, ti, , , drop = TRUE]
if (!is.matrix(mat))
mat <- matrix(mat, nrow = n_lat, ncol = n_lon)
dt <- CJ(lat_ix = seq_len(n_lat),
lon_ix = seq_len(n_lon))
dt[, value := as.vector(mat)]
dt[, `:=`(
time = time_vec[ti],
lat = lat_vec[lat_ix],
lon = lon_vec[lon_ix]
)]
chunks[[ti]] <- dt[, .(time, lat, lon, value)]
}
member_dt <- rbindlist(chunks)
member_dt[, member := member_label]
setcolorder(member_dt, c("member", "time", "lat", "lon", "value"))
if (is.character(member_dt$time)) {
member_dt[, time := as.POSIXct(time, tz = "UTC")]
}
write_parquet(
member_dt,
sink = out_file,
compression = "zstd"
)
rm(chunks, member_dt); gc()
}
}
rm(my_data, exp_tbl); gc()
cat("\nAll done. Files written to:\n", out_root, "\n")
}