Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5fb3647b8dc66e0c22e13797f2b3b05a9c09ac9e4a6d0d82b03f1556197fa219
4e7d2561b35ebfa7f782acf98d1b0517aec5cfac88c88839d4f1d1913ac82e3f
4,813 changes: 2,605 additions & 2,208 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def run_migrations_offline():

"""
context.configure(
url=settings.SQL_ALCHEMY_CONN,
url=settings.get_sql_alchemy_conn(),
target_metadata=target_metadata,
literal_binds=True,
compare_type=compare_type,
Expand Down Expand Up @@ -114,7 +114,7 @@ def process_revision_directives(context, revision, directives):
connection = config.attributes.get("connection", None)

if not connection:
connection = stack.push(settings.engine.connect())
connection = stack.push(settings.get_engine().connect())

context.configure(
connection=connection,
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ def write_dag(

@classmethod
def latest_item_select_object(cls, dag_id):
from airflow.settings import engine
from airflow.settings import get_engine

if engine.dialect.name == "mysql":
if get_engine().dialect.name == "mysql":
# Prevent "Out of sort memory" caused by large values in cls.data column for MySQL.
# Details in https://github.com/apache/airflow/pull/55589
latest_item_id = (
Expand Down
3 changes: 1 addition & 2 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1220,8 +1220,7 @@ def _check_and_change_state_before_execution(

# Closing all pooled connections to prevent
# "max number of connections reached"
if settings.engine is not None:
settings.engine.dispose()
settings.dispose_orm()
if verbose:
if mark_success:
cls.logger().info("Marking success for %s on %s", ti.task, ti.logical_date)
Expand Down
209 changes: 135 additions & 74 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,108 @@

SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")

SQL_ALCHEMY_CONN: str | None = None
SQL_ALCHEMY_CONN_ASYNC: str | None = None

class _AirflowDBSettings:
"""Singleton class to hold Airflow DB settings and engine."""

block_orm_access_blocked: bool = False
sql_alchemy_conn: str | None = None
sql_alchemy_conn_async: str | None = None
engine: Engine | None = None
async_engine: AsyncEngine | None = None

_instance = None

def __new__(cls, *args, **kwargs):
"""Ensure this class is a singleton."""
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance

def block_orm_access(self):
self.block_orm_access_blocked = True

def get_sql_alchemy_conn(self) -> str:
if self.block_orm_access_blocked:
raise AttributeError("Access to the Airflow Metadatabase from dags is not allowed!")
if self.sql_alchemy_conn is None:
raise RuntimeError("SQLAlchemy connection string not configured. Call configure_vars() first.")
return self.sql_alchemy_conn

def get_sql_alchemy_conn_async(self) -> str:
if self.block_orm_access_blocked:
raise AttributeError("Access to the Airflow Metadatabase from dags is not allowed!")
if self.sql_alchemy_conn_async is None:
raise RuntimeError(
"Asynchronous SQLAlchemy connection string not configured. Call configure_vars() first."
)
return self.sql_alchemy_conn_async

def _get_async_conn_uri_from_sync(self, sync_uri: str) -> str:
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""Mapping of sync scheme to async scheme."""

scheme, rest = sync_uri.split(":", maxsplit=1)
scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
if aiolib:
return f"{scheme}+{aiolib}:{rest}"
return sync_uri

def set_sql_alchemy_conn(self, conn: str, async_conn: str | None = None) -> None:
self.sql_alchemy_conn = conn
if async_conn:
self.sql_alchemy_conn_async = async_conn
else:
self.sql_alchemy_conn_async = self._get_async_conn_uri_from_sync(conn)

def get_engine(self) -> Engine:
if self.block_orm_access_blocked:
raise AttributeError("Access to the Airflow Metadatabase from dags is not allowed!")
if self.engine is None:
raise RuntimeError("Engine not configured. Call configure_orm() first.")
return self.engine

def set_engine(self, engine: Engine, async_engine: AsyncEngine | None) -> None:
self.engine = engine
self.async_engine = async_engine

def dispose_engine(self) -> None:
if self.engine is not None:
self.engine.dispose()
self.engine = None


PLUGINS_FOLDER: str | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

engine: Engine | None = None
Session: scoped_session | None = None
# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
# additional precautions. The only use case is when the session lifecycle needs
# custom handling. Most of the time we only want one unique thread local session object,
# this is achieved by the Session factory above.
NonScopedSession: sessionmaker | None = None
async_engine: AsyncEngine | None = None
AsyncSession: Callable[..., SAAsyncSession] | None = None


def get_engine():
def block_orm_access():
"""Block access to the ORM by marking state."""
_AirflowDBSettings().block_orm_access()


def get_sql_alchemy_conn() -> str:
"""Get the configured SQLAlchemy connection string, raising an error if not configured."""
return _AirflowDBSettings().get_sql_alchemy_conn()


def get_sql_alchemy_conn_async() -> str:
"""Get the configured asynchronous SQLAlchemy connection string, raising an error if not configured."""
return _AirflowDBSettings().get_sql_alchemy_conn_async()


def get_engine() -> Engine:
"""Get the configured engine, raising an error if not configured."""
if engine is None:
raise RuntimeError("Engine not configured. Call configure_orm() first.")
return engine
return _AirflowDBSettings().get_engine()


def get_session():
Expand Down Expand Up @@ -233,30 +314,15 @@ def load_policy_plugins(pm: pluggy.PluginManager):
pm.load_setuptools_entrypoints("airflow.policy")


def _get_async_conn_uri_from_sync(sync_uri):
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""Mapping of sync scheme to async scheme."""

scheme, rest = sync_uri.split(":", maxsplit=1)
scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
if aiolib:
return f"{scheme}+{aiolib}:{rest}"
return sync_uri


def configure_vars():
"""Configure Global Variables from airflow.cfg."""
global SQL_ALCHEMY_CONN
global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER

SQL_ALCHEMY_CONN = conf.get("database", "sql_alchemy_conn")
if conf.has_option("database", "sql_alchemy_conn_async"):
SQL_ALCHEMY_CONN_ASYNC = conf.get("database", "sql_alchemy_conn_async")
else:
SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)
_AirflowDBSettings().set_sql_alchemy_conn(
conf.get("database", "sql_alchemy_conn"),
conf.get("database", "sql_alchemy_conn_async", fallback=None),
)

DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))

Expand Down Expand Up @@ -347,72 +413,49 @@ def _get_connect_args(mode: Literal["sync", "async"]) -> Any:
return {}


def _configure_async_session() -> None:
"""
Configure async SQLAlchemy session.

This exists so tests can reconfigure the session. How SQLAlchemy configures
this does not work well with Pytest and you can end up with issues when the
session and runs in a different event loop from the test itself.
"""
global AsyncSession, async_engine

if not SQL_ALCHEMY_CONN_ASYNC:
async_engine = None
AsyncSession = None
return

async_engine = create_async_engine(
SQL_ALCHEMY_CONN_ASYNC,
connect_args=_get_connect_args("async"),
future=True,
)
AsyncSession = async_sessionmaker(
bind=async_engine,
class_=SAAsyncSession,
autoflush=False,
expire_on_commit=False,
)


def configure_orm(disable_connection_pool=False, pool_class=None):
"""Configure ORM using SQLAlchemy."""
from airflow._shared.secrets_masker import mask_secret

if _is_sqlite_db_path_relative(SQL_ALCHEMY_CONN):
if _is_sqlite_db_path_relative(get_sql_alchemy_conn()):
from airflow.exceptions import AirflowConfigException

raise AirflowConfigException(
f"Cannot use relative path: `{SQL_ALCHEMY_CONN}` to connect to sqlite. "
f"Cannot use relative path: `{get_sql_alchemy_conn()}` to connect to sqlite. "
"Please use absolute path such as `sqlite:////tmp/airflow.db`."
)

global NonScopedSession
global Session
global engine
global AsyncSession

if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
engine = None
_AirflowDBSettings.engine = None
return
log.debug("Setting up DB connection pool (PID %s)", os.getpid())
engine_args = prepare_engine_args(disable_connection_pool, pool_class)

connect_args = _get_connect_args("sync")
if SQL_ALCHEMY_CONN.startswith("sqlite"):
if get_sql_alchemy_conn().startswith("sqlite"):
# FastAPI runs sync endpoints in a separate thread. SQLite does not allow
# to use objects created in another threads by default. Allowing that in test
# to so the `test` thread and the tested endpoints can use common objects.
connect_args["check_same_thread"] = False

engine = create_engine(
SQL_ALCHEMY_CONN,
get_sql_alchemy_conn(),
connect_args=connect_args,
**engine_args,
future=True,
)
_configure_async_session()
async_engine = create_async_engine(
get_sql_alchemy_conn_async(),
connect_args=_get_connect_args("async"),
future=True,
)
_AirflowDBSettings().set_engine(engine, async_engine)
mask_secret(engine.url.password)
setup_event_handlers(engine)

Expand All @@ -425,10 +468,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
autoflush=False,
expire_on_commit=False,
)
if engine is None:
raise RuntimeError("Engine must be initialized before creating a session")
NonScopedSession = _session_maker(engine)
Session = scoped_session(NonScopedSession)
AsyncSession = async_sessionmaker(
bind=async_engine,
class_=SAAsyncSession,
autoflush=False,
expire_on_commit=False,
)

if register_at_fork := getattr(os, "register_at_fork", None):
# https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
Expand Down Expand Up @@ -460,7 +507,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):

default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
if SQL_ALCHEMY_CONN.startswith(dialect):
if get_sql_alchemy_conn().startswith(dialect):
default_args = default.copy()
break

Expand All @@ -472,7 +519,7 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):
elif disable_connection_pool or not conf.getboolean("database", "SQL_ALCHEMY_POOL_ENABLED"):
engine_args["poolclass"] = NullPool
log.debug("settings.prepare_engine_args(): Using NullPool")
elif not SQL_ALCHEMY_CONN.startswith("sqlite"):
elif not get_sql_alchemy_conn().startswith("sqlite"):
# Pool size engine args not supported by sqlite.
# If no config value is defined for the pool size, select a reasonable value.
# 0 means no limit, which could lead to exceeding the Database connection limit.
Expand Down Expand Up @@ -521,18 +568,18 @@ def prepare_engine_args(disable_connection_pool=False, pool_class=None):
# 'READ COMMITTED' is the default value for PostgreSQL.
# More information here:
# https://dev.mysql.com/doc/refman/8.0/en/innodb-transaction-isolation-levels.html
if SQL_ALCHEMY_CONN.startswith("mysql"):
if get_sql_alchemy_conn().startswith("mysql"):
engine_args["isolation_level"] = "READ COMMITTED"

return engine_args


def dispose_orm(do_log: bool = True):
"""Properly close pooled database connections."""
global Session, engine, NonScopedSession
global Session, NonScopedSession

_globals = globals()
if _globals.get("engine") is None and _globals.get("Session") is None:
if get_engine() is None and _globals.get("Session") is None:
return

if do_log:
Expand All @@ -546,9 +593,7 @@ def dispose_orm(do_log: bool = True):
NonScopedSession = None
close_all_sessions()

if "engine" in _globals and engine is not None:
engine.dispose()
engine = None
_AirflowDBSettings().dispose_engine()


def reconfigure_orm(disable_connection_pool=False, pool_class=None):
Expand All @@ -561,12 +606,12 @@ def configure_adapters():
"""Register Adapters and DB Converters."""
from pendulum import DateTime as Pendulum

if SQL_ALCHEMY_CONN.startswith("sqlite"):
if get_sql_alchemy_conn().startswith("sqlite"):
from sqlite3 import register_adapter

register_adapter(Pendulum, lambda val: val.isoformat(" "))

if SQL_ALCHEMY_CONN.startswith("mysql"):
if get_sql_alchemy_conn().startswith("mysql"):
try:
try:
import MySQLdb.converters
Expand Down Expand Up @@ -643,6 +688,22 @@ def __getattr__(name: str):

from airflow.exceptions import RemovedInAirflow4Warning

if name == "SQL_ALCHEMY_CONN":
warnings.warn(
"settings.SQL_ALCHEMY_CONN has been replaced by get_sql_alchemy_conn(). This shim is just for compatibility. "
"Please upgrade your provider or integration.",
RemovedInAirflow4Warning,
stacklevel=2,
)
return get_sql_alchemy_conn()
if name == "SQL_ALCHEMY_CONN_ASYNC":
warnings.warn(
"settings.SQL_ALCHEMY_CONN_ASYNC has been replaced by get_sql_alchemy_conn_async(). This shim is just for compatibility. "
"Please upgrade your provider or integration.",
RemovedInAirflow4Warning,
stacklevel=2,
)
return get_sql_alchemy_conn_async()
if name == "MASK_SECRETS_IN_LOGS":
warnings.warn(
"settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
Expand Down
Loading
Loading