Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions ocpi/core/authentication/verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,30 +135,31 @@ async def __call__(

class VersionsAuthorizationVerifier(CredentialsAuthorizationVerifier):
"""
A class responsible for verifying authorization tokens
based on the specified version number.
Verifies authorization for versions and version details endpoints.
When VERSIONS_REQUIRE_AUTH is False, allows unauthenticated access (discovery).
"""

async def __call__(
self,
authorization: str = auth_verifier,
authorization: str = Header(default="", alias="Authorization"),
authenticator: Authenticator = Depends(get_authenticator),
) -> str | dict | None:
"""
Verifies the authorization token using the specified version
and an Authenticator for version endpoints.

:param authorization (str): The authorization header containing
the token.
:param authenticator (Authenticator): An Authenticator instance used
for authentication.

:raises AuthorizationOCPIError: If there is an issue with
the authorization token.
Verifies the authorization token for version endpoints.
If VERSIONS_REQUIRE_AUTH is False, allows requests without token.
"""
if settings.NO_AUTH and authorization == "":
logger.debug("Authentication skipped due to NO_AUTH setting.")
return ""
if not settings.VERSIONS_REQUIRE_AUTH and (
not authorization or authorization.strip() == ""
):
logger.debug(
"Versions/details accessed without auth (VERSIONS_REQUIRE_AUTH=false)."
)
return ""
if not authorization or authorization.strip() == "":
return None
return await super().__call__(authorization, authenticator)


Expand Down
5 changes: 4 additions & 1 deletion ocpi/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class Settings(BaseSettings):

ENVIRONMENT: str = "production"
NO_AUTH: bool = False
# When False, versions and version details endpoints can be accessed without auth (discovery).
# Other endpoints still require auth. Set via VERSIONS_REQUIRE_AUTH env var.
VERSIONS_REQUIRE_AUTH: bool = True
PROJECT_NAME: str = "OCPI"
BACKEND_CORS_ORIGINS: list[AnyHttpUrl] = []
OCPI_HOST: str = "www.example.com"
Expand All @@ -36,7 +39,7 @@ class Settings(BaseSettings):
def assemble_cors_origins(cls, v: str | list[str]) -> list[str] | str:
if isinstance(v, str) and not v.startswith("["):
return [i.strip() for i in v.split(",")]
if isinstance(v, (list, str)):
if isinstance(v, list | str):
return v
raise ValueError(v)

Expand Down
48 changes: 48 additions & 0 deletions tests/test_modules/test_v_2_2_1/test_versions/test_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,51 @@ async def do(cls, *args, **kwargs):
)

assert response.status_code == 401


def test_get_versions_without_auth_when_optional(monkeypatch):
"""When VERSIONS_REQUIRE_AUTH=False, /versions works without Authorization header."""
monkeypatch.setattr("ocpi.core.config.settings.VERSIONS_REQUIRE_AUTH", False)

class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_2_1],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(VERSIONS_URL)

assert response.status_code == 200
assert len(response.json()["data"]) == 1


def test_get_version_details_without_auth_when_optional(monkeypatch):
"""When VERSIONS_REQUIRE_AUTH=False, /{version}/details works without Authorization header."""
monkeypatch.setattr("ocpi.core.config.settings.VERSIONS_REQUIRE_AUTH", False)

class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_2_1],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(VERSION_URL)

assert response.status_code == 200
assert len(response.json()["data"]) == 2
7 changes: 7 additions & 0 deletions tests/test_modules/test_v_2_3_0/test_versions/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from tests.test_modules.utils import (
ENCODED_AUTH_TOKEN_V_2_3_0,
ENCODED_RANDOM_AUTH_TOKEN_V_2_3_0,
)

AUTH_HEADERS = {"Authorization": f"Token {ENCODED_AUTH_TOKEN_V_2_3_0}"}
WRONG_AUTH_HEADERS = {"Authorization": f"Token {ENCODED_RANDOM_AUTH_TOKEN_V_2_3_0}"}
156 changes: 156 additions & 0 deletions tests/test_modules/test_v_2_3_0/test_versions/test_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from fastapi.testclient import TestClient

from ocpi.core import enums
from ocpi.core.crud import Crud
from ocpi.main import get_application
from ocpi.modules.versions.enums import VersionNumber
from tests.test_modules.test_v_2_3_0.test_versions.test_utils import (
AUTH_HEADERS,
WRONG_AUTH_HEADERS,
)
from tests.test_modules.utils import AUTH_TOKEN, ClientAuthenticator

VERSIONS_URL = "/ocpi/versions"
VERSION_URL = "/ocpi/2.3.0/details"


def test_get_versions():
class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return AUTH_TOKEN

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(
VERSIONS_URL,
headers=AUTH_HEADERS,
)

assert response.status_code == 200
assert len(response.json()["data"]) == 1


def test_get_versions_not_authenticated():
class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(
VERSIONS_URL,
headers=WRONG_AUTH_HEADERS,
)

assert response.status_code == 401


def test_get_versions_v_2_3_0():
class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return AUTH_TOKEN

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(
VERSION_URL,
headers=AUTH_HEADERS,
)

assert response.status_code == 200
assert len(response.json()["data"]) == 2


def test_get_versions_v_2_3_0_not_authenticated():
class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(
VERSION_URL,
headers=WRONG_AUTH_HEADERS,
)

assert response.status_code == 401


def test_get_versions_without_auth_when_optional(monkeypatch):
"""When VERSIONS_REQUIRE_AUTH=False, /versions works without Authorization header."""
monkeypatch.setattr("ocpi.core.config.settings.VERSIONS_REQUIRE_AUTH", False)

class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(VERSIONS_URL)

assert response.status_code == 200
assert len(response.json()["data"]) == 1


def test_get_version_details_without_auth_when_optional(monkeypatch):
"""When VERSIONS_REQUIRE_AUTH=False, /2.3.0/details works without Authorization header."""
monkeypatch.setattr("ocpi.core.config.settings.VERSIONS_REQUIRE_AUTH", False)

class MockCrud(Crud):
@classmethod
async def do(cls, *args, **kwargs):
return None

app = get_application(
version_numbers=[VersionNumber.v_2_3_0],
roles=[enums.RoleEnum.cpo],
crud=MockCrud,
authenticator=ClientAuthenticator,
modules=[],
)
client = TestClient(app)

response = client.get(VERSION_URL)

assert response.status_code == 200
assert len(response.json()["data"]) == 2
Loading