Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -1641,22 +1641,50 @@ set -euo pipefail
readonly DIRECTORY="${AIRFLOW_HOME:-/usr/local/airflow}"
readonly RETENTION="${AIRFLOW__LOG_RETENTION_DAYS:-15}"
readonly FREQUENCY="${AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES:-15}"
readonly MAX_PERCENT="${AIRFLOW__LOG_MAX_SIZE_PERCENT:-0}"

trap "exit" INT TERM

MAX_SIZE_BYTES="${AIRFLOW__LOG_MAX_SIZE_BYTES:-0}"
if [[ "$MAX_SIZE_BYTES" -eq 0 && "$MAX_PERCENT" -gt 0 ]]; then
total_space=$(df -k "${DIRECTORY}"/logs 2>/dev/null | tail -1 | awk '{print $2}' || echo "0")
MAX_SIZE_BYTES=$(( total_space * 1024 * MAX_PERCENT / 100 ))
echo "Computed MAX_SIZE_BYTES from ${MAX_PERCENT}% of disk: ${MAX_SIZE_BYTES} bytes"
fi

readonly MAX_SIZE_BYTES

readonly EVERY=$((FREQUENCY*60))

echo "Cleaning logs every $EVERY seconds"
if [[ "$MAX_SIZE_BYTES" -gt 0 ]]; then
echo "Max log size limit: $MAX_SIZE_BYTES bytes"
fi

retention_days="${RETENTION}"

while true; do
echo "Trimming airflow logs to ${RETENTION} days."
echo "Trimming airflow logs to ${retention_days} days."
find "${DIRECTORY}"/logs \
-type d -name 'lost+found' -prune -o \
-type f -mtime +"${RETENTION}" -name '*.log' -print0 | \
-type f -mtime +"${retention_days}" -name '*.log' -print0 | \
xargs -0 rm -f || true

if [[ "$MAX_SIZE_BYTES" -gt 0 && "$retention_days" -ge 0 ]]; then
current_size=$(df -k "${DIRECTORY}"/logs 2>/dev/null | tail -1 | awk '{print $3}' || echo "0")
current_size=$(( current_size * 1024 ))

if [[ "$current_size" -gt "$MAX_SIZE_BYTES" ]]; then
retention_days=$((retention_days - 1))
echo "Size ($current_size bytes) exceeds limit ($MAX_SIZE_BYTES bytes). Reducing retention to ${retention_days} days."
continue
fi
fi

find "${DIRECTORY}"/logs -type d -empty -delete || true

retention_days="${RETENTION}"

seconds=$(( $(date -u +%s) % EVERY))
(( seconds < 1 )) || sleep $((EVERY - seconds - 1))
sleep 1
Expand Down
70 changes: 70 additions & 0 deletions airflow-core/tests/unit/charts/log_groomer.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,76 @@ def test_log_groomer_frequency_minutes_overrides(self, frequency_minutes, freque
else:
assert len(jmespath.search("spec.template.spec.containers[1].env", docs[0])) == 2

@pytest.mark.parametrize(
("max_size_bytes", "max_size_result"), [(None, None), (1234567890, "1234567890")]
)
def test_log_groomer_max_size_bytes_overrides(self, max_size_bytes, max_size_result):
if self.obj_name == "dag-processor":
values = {
"dagProcessor": {
"enabled": True,
"logGroomerSidecar": {"maxSizeBytes": max_size_bytes},
}
}
else:
values = {f"{self.folder}": {"logGroomerSidecar": {"maxSizeBytes": max_size_bytes}}}

docs = render_chart(
values=values,
show_only=[f"templates/{self.folder}/{self.obj_name}-deployment.yaml"],
)

if max_size_result:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_BYTES'].value | [0]",
docs[0],
)
== max_size_result
)
else:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_BYTES'].value | [0]",
docs[0],
)
is None
)

@pytest.mark.parametrize(("max_size_percent", "max_size_result"), [(None, None), (80, "80")])
def test_log_groomer_max_size_percent_overrides(self, max_size_percent, max_size_result):
if self.obj_name == "dag-processor":
values = {
"dagProcessor": {
"enabled": True,
"logGroomerSidecar": {"maxSizePercent": max_size_percent},
}
}
else:
values = {f"{self.folder}": {"logGroomerSidecar": {"maxSizePercent": max_size_percent}}}

docs = render_chart(
values=values,
show_only=[f"templates/{self.folder}/{self.obj_name}-deployment.yaml"],
)

if max_size_result:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_PERCENT'].value | [0]",
docs[0],
)
== max_size_result
)
else:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_PERCENT'].value | [0]",
docs[0],
)
is None
)

def test_log_groomer_resources(self):
if self.obj_name == "dag-processor":
values = {
Expand Down
8 changes: 8 additions & 0 deletions chart/templates/dag-processor/dag-processor-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ spec:
{{- if .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}
- name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES
value: "{{ .Values.dagProcessor.logGroomerSidecar.frequencyMinutes }}"
{{- end }}
{{- if .Values.dagProcessor.logGroomerSidecar.maxSizeBytes }}
- name: AIRFLOW__LOG_MAX_SIZE_BYTES
value: "{{ .Values.dagProcessor.logGroomerSidecar.maxSizeBytes | int64 }}"
{{- end }}
{{- if .Values.dagProcessor.logGroomerSidecar.maxSizePercent }}
- name: AIRFLOW__LOG_MAX_SIZE_PERCENT
value: "{{ .Values.dagProcessor.logGroomerSidecar.maxSizePercent }}"
{{- end }}
- name: AIRFLOW_HOME
value: "{{ .Values.airflowHome }}"
Expand Down
8 changes: 8 additions & 0 deletions chart/templates/scheduler/scheduler-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ spec:
{{- if .Values.scheduler.logGroomerSidecar.frequencyMinutes }}
- name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES
value: "{{ .Values.scheduler.logGroomerSidecar.frequencyMinutes }}"
{{- end }}
{{- if .Values.scheduler.logGroomerSidecar.maxSizeBytes }}
- name: AIRFLOW__LOG_MAX_SIZE_BYTES
value: "{{ .Values.scheduler.logGroomerSidecar.maxSizeBytes | int64 }}"
{{- end }}
{{- if .Values.scheduler.logGroomerSidecar.maxSizePercent }}
- name: AIRFLOW__LOG_MAX_SIZE_PERCENT
value: "{{ .Values.scheduler.logGroomerSidecar.maxSizePercent }}"
{{- end }}
- name: AIRFLOW_HOME
value: "{{ .Values.airflowHome }}"
Expand Down
8 changes: 8 additions & 0 deletions chart/templates/triggerer/triggerer-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ spec:
{{- if .Values.triggerer.logGroomerSidecar.frequencyMinutes }}
- name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES
value: "{{ .Values.triggerer.logGroomerSidecar.frequencyMinutes }}"
{{- end }}
{{- if .Values.triggerer.logGroomerSidecar.maxSizeBytes }}
- name: AIRFLOW__LOG_MAX_SIZE_BYTES
value: "{{ .Values.triggerer.logGroomerSidecar.maxSizeBytes | int64 }}"
{{- end }}
{{- if .Values.triggerer.logGroomerSidecar.maxSizePercent }}
- name: AIRFLOW__LOG_MAX_SIZE_PERCENT
value: "{{ .Values.triggerer.logGroomerSidecar.maxSizePercent }}"
{{- end }}
- name: AIRFLOW_HOME
value: "{{ .Values.airflowHome }}"
Expand Down
8 changes: 8 additions & 0 deletions chart/templates/workers/worker-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ spec:
{{- if .Values.workers.logGroomerSidecar.frequencyMinutes }}
- name: AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES
value: "{{ .Values.workers.logGroomerSidecar.frequencyMinutes }}"
{{- end }}
{{- if .Values.workers.logGroomerSidecar.maxSizeBytes }}
- name: AIRFLOW__LOG_MAX_SIZE_BYTES
value: "{{ .Values.workers.logGroomerSidecar.maxSizeBytes | int64 }}"
{{- end }}
{{- if .Values.workers.logGroomerSidecar.maxSizePercent }}
- name: AIRFLOW__LOG_MAX_SIZE_PERCENT
value: "{{ .Values.workers.logGroomerSidecar.maxSizePercent }}"
{{- end }}
- name: AIRFLOW_HOME
value: "{{ .Values.airflowHome }}"
Expand Down
13 changes: 13 additions & 0 deletions chart/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -13732,6 +13732,19 @@
"type": "integer",
"default": 15
},
"maxSizeBytes": {
"description": "Max size of logs directory in bytes. When exceeded, the log groomer reduces retention until size is under limit. 0 = disabled.",
"type": "integer",
"default": 0,
"minimum": 0
},
"maxSizePercent": {
"description": "Max size of logs as a percentage of total disk space. When exceeded, the log groomer reduces retention until size is under limit. 0 = disabled. Ignored if maxSizeBytes is set.",
"type": "integer",
"default": 0,
"minimum": 0,
"maximum": 100
},
"env": {
"description": "Add additional env vars to log groomer sidecar container (templated).",
"items": {
Expand Down
18 changes: 18 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,12 @@ workers:
# Frequency to attempt to groom logs (in minutes)
frequencyMinutes: 15

# Max size of logs in bytes. 0 = disabled
maxSizeBytes: 0

# Max size of logs as a percent of disk usage. 0 = disabled. Ignored if maxSizeBytes is set.
maxSizePercent: 0

resources: {}
# limits:
# cpu: 100m
Expand Down Expand Up @@ -1363,6 +1369,10 @@ scheduler:
retentionDays: 15
# frequency to attempt to groom logs, in minutes
frequencyMinutes: 15
# Max size of logs in bytes. 0 = disabled
maxSizeBytes: 0
# Max size of logs as a percent of disk usage. 0 = disabled. Ignored if maxSizeBytes is set.
maxSizePercent: 0
resources: {}
# limits:
# cpu: 100m
Expand Down Expand Up @@ -2203,6 +2213,10 @@ triggerer:
retentionDays: 15
# frequency to attempt to groom logs, in minutes
frequencyMinutes: 15
# Max size of logs in bytes. 0 = disabled
maxSizeBytes: 0
# Max size of logs as a percent of disk usage. 0 = disabled. Ignored if maxSizeBytes is set.
maxSizePercent: 0
resources: {}
# limits:
# cpu: 100m
Expand Down Expand Up @@ -2429,6 +2443,10 @@ dagProcessor:
retentionDays: 15
# frequency to attempt to groom logs, in minutes
frequencyMinutes: 15
# Max size of logs in bytes. 0 = disabled
maxSizeBytes: 0
# Max size of logs as a percent of disk usage. 0 = disabled. Ignored if maxSizeBytes is set.
maxSizePercent: 0
resources: {}
# limits:
# cpu: 100m
Expand Down
70 changes: 70 additions & 0 deletions helm-tests/tests/chart_utils/log_groomer.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,76 @@ def test_log_groomer_frequency_minutes_overrides(self, frequency_minutes, freque
else:
assert len(jmespath.search("spec.template.spec.containers[1].env", docs[0])) == 2

@pytest.mark.parametrize(
("max_size_bytes", "max_size_result"), [(None, None), (1234567890, "1234567890")]
)
def test_log_groomer_max_size_bytes_overrides(self, max_size_bytes, max_size_result):
if self.obj_name == "dag-processor":
values = {
"dagProcessor": {
"enabled": True,
"logGroomerSidecar": {"maxSizeBytes": max_size_bytes},
}
}
else:
values = {f"{self.folder}": {"logGroomerSidecar": {"maxSizeBytes": max_size_bytes}}}

docs = render_chart(
values=values,
show_only=[f"templates/{self.folder}/{self.obj_name}-deployment.yaml"],
)

if max_size_result:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_BYTES'].value | [0]",
docs[0],
)
== max_size_result
)
else:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_BYTES'].value | [0]",
docs[0],
)
is None
)

@pytest.mark.parametrize(("max_size_percent", "max_size_result"), [(None, None), (80, "80")])
def test_log_groomer_max_size_percent_overrides(self, max_size_percent, max_size_result):
if self.obj_name == "dag-processor":
values = {
"dagProcessor": {
"enabled": True,
"logGroomerSidecar": {"maxSizePercent": max_size_percent},
}
}
else:
values = {f"{self.folder}": {"logGroomerSidecar": {"maxSizePercent": max_size_percent}}}

docs = render_chart(
values=values,
show_only=[f"templates/{self.folder}/{self.obj_name}-deployment.yaml"],
)

if max_size_result:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_PERCENT'].value | [0]",
docs[0],
)
== max_size_result
)
else:
assert (
jmespath.search(
"spec.template.spec.containers[1].env[?name=='AIRFLOW__LOG_MAX_SIZE_PERCENT'].value | [0]",
docs[0],
)
is None
)

def test_log_groomer_resources(self):
if self.obj_name == "dag-processor":
values = {
Expand Down
32 changes: 30 additions & 2 deletions scripts/docker/clean-logs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,50 @@ set -euo pipefail
readonly DIRECTORY="${AIRFLOW_HOME:-/usr/local/airflow}"
readonly RETENTION="${AIRFLOW__LOG_RETENTION_DAYS:-15}"
readonly FREQUENCY="${AIRFLOW__LOG_CLEANUP_FREQUENCY_MINUTES:-15}"
readonly MAX_PERCENT="${AIRFLOW__LOG_MAX_SIZE_PERCENT:-0}"

trap "exit" INT TERM

MAX_SIZE_BYTES="${AIRFLOW__LOG_MAX_SIZE_BYTES:-0}"
if [[ "$MAX_SIZE_BYTES" -eq 0 && "$MAX_PERCENT" -gt 0 ]]; then
total_space=$(df -k "${DIRECTORY}"/logs 2>/dev/null | tail -1 | awk '{print $2}' || echo "0")
MAX_SIZE_BYTES=$(( total_space * 1024 * MAX_PERCENT / 100 ))
echo "Computed MAX_SIZE_BYTES from ${MAX_PERCENT}% of disk: ${MAX_SIZE_BYTES} bytes"
fi

readonly MAX_SIZE_BYTES

readonly EVERY=$((FREQUENCY*60))

echo "Cleaning logs every $EVERY seconds"
if [[ "$MAX_SIZE_BYTES" -gt 0 ]]; then
echo "Max log size limit: $MAX_SIZE_BYTES bytes"
fi

retention_days="${RETENTION}"

while true; do
echo "Trimming airflow logs to ${RETENTION} days."
echo "Trimming airflow logs to ${retention_days} days."
find "${DIRECTORY}"/logs \
-type d -name 'lost+found' -prune -o \
-type f -mtime +"${RETENTION}" -name '*.log' -print0 | \
-type f -mtime +"${retention_days}" -name '*.log' -print0 | \
xargs -0 rm -f || true

if [[ "$MAX_SIZE_BYTES" -gt 0 && "$retention_days" -ge 0 ]]; then
current_size=$(df -k "${DIRECTORY}"/logs 2>/dev/null | tail -1 | awk '{print $3}' || echo "0")
current_size=$(( current_size * 1024 ))

if [[ "$current_size" -gt "$MAX_SIZE_BYTES" ]]; then
retention_days=$((retention_days - 1))
echo "Size ($current_size bytes) exceeds limit ($MAX_SIZE_BYTES bytes). Reducing retention to ${retention_days} days."
continue
fi
fi

find "${DIRECTORY}"/logs -type d -empty -delete || true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L65 should be executed before L54

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @n-badtke-cg , I don't quite see the logic bug you're referring to, could you elaborate?

The way I think it works is the following but perhaps I've missed something.

  1. Delete log files based on retention days
  2. If Max size is set and size exceed max size reduce retention days by 1 and go to step 1, otherwise go to step 3.
  3. Delete empty log directories
  4. Sleep

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @n-badtke-cg , I don't quite see the logic bug you're referring to, could you elaborate?

The way I think it works is the following but perhaps I've missed something.

  1. Delete log files based on retention days
  2. If Max size is set and size exceed max size reduce retention days by 1 and go to step 1, otherwise go to step 3.
  3. Delete empty log directories
  4. Sleep

I guess you are referring to an old message by me "found a logical bug" that was a false message by me, sorry for that!

And also with this message, I just misinterpret the code, sorry. It's actually kinda nice. It's reducing the retention time temporarily to free up more space in specific situations when needed. After the retention period has been temporarily shortened and storage space freed up, the retention period is reset to the env value.

Fair. Again sorry for the unnecessary messages.


retention_days="${RETENTION}"

seconds=$(( $(date -u +%s) % EVERY))
(( seconds < 1 )) || sleep $((EVERY - seconds - 1))
sleep 1
Expand Down
Loading