Skip to content

[Enh]: Add Expr.str.json_decode #3506

@thomasaarholt

Description

@thomasaarholt

Note

Edited by @dangotbanned

We would like to learn about your use case. For example, if this feature is needed to adopt Narwhals in an open source project, could you please enter the link to it below?

I'm trying to introduce narwhals at Microsoft. I'm a data scientist working there, analyzing huge data in Azure ML.

Please describe the purpose of the new feature or describe the problem to solve.

We often get (heavily!) nested json data structures in the dataframes we get through various APIs.

Polars and pyspark both have implementations to deal with this, and I've written a function that incorporates both:

Show function

from typing import TypeVar

import polars as pl
import pyspark.sql.functions as F
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.types import StructType

Frame = TypeVar("Frame", pl.DataFrame, pl.LazyFrame, SparkDataFrame)


def decode_json_columns(
    df: Frame,
    column: str,
    new_column: str,
    json_schema: pl.Struct | StructType | None = None,
) -> Frame:
    """
    Decodes the 'content' column from JSON string to a struct with a defined schema.
    Args:
        df: Input DataFrame with a 'content' column containing JSON strings.
        schema: Polars Struct dtype or PySpark StructType for the JSON schema.
    """
    if isinstance(df, (pl.DataFrame, pl.LazyFrame)) and isinstance(json_schema, pl.Struct):
        return df.with_columns(pl.col(column).str.json_decode(dtype=json_schema).alias(new_column))
    elif isinstance(df, SparkDataFrame) and isinstance(json_schema, StructType):
        return df.withColumn(
            new_column,
            F.from_json(
                col=column,
                schema=json_schema,
                options={"mode": "PERMISSIVE"}, # these aren't necessary, but I thought I'd share this as food for thought
            ),
        )
    else:
        raise TypeError("Unsupported DataFrame type. Expected Polars or PySpark DataFrame.")

Minimized

import polars as pl

import sqlframe.standalone as sql
import sqlframe.standalone.functions as sql_f
import sqlframe.standalone.types as sql_types


def str_json_decode_polars(name: str, dtype: pl.DataType) -> pl.Expr:
    return pl.col(name).str.json_decode(dtype)


def str_json_decode_pyspark(
    name: str, dtype: sql_types.ArrayType | sql_types.StructType
) -> sql.Column:
    return sql_f.from_json(name, dtype)

This is a feature request for narwhals to support a similar API as polars.

It would be great to be able to provide the schema once, using nw.Schema, and thus avoid this boilerplate and having to pass the schema in per-implementation.

Suggest a solution if possible.

I haven't gotten used to the narwhals codebase yet, so don't have a solution, but here is at least a small example using the above function that might prove useful.
Note that I've had to add some os.environ and spark.config in order to get spark working on my Mac where I've written this example.

Show suggested solution

from __future__ import annotations

from typing import TypeVar

import polars as pl
import pyspark.sql.functions as F
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

Frame = TypeVar("Frame", pl.DataFrame, pl.LazyFrame, SparkDataFrame)
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable


def decode_json_columns(
    df: Frame,
    column: str,
    new_column: str,
    json_schema: pl.Struct | StructType | None = None,
) -> Frame:
    if isinstance(df, (pl.DataFrame, pl.LazyFrame)) and isinstance(json_schema, pl.Struct):
        return df.with_columns(pl.col(column).str.json_decode(dtype=json_schema).alias(new_column))
    elif isinstance(df, SparkDataFrame) and isinstance(json_schema, StructType):
        return df.withColumn(
            new_column,
            F.from_json(col=column, schema=json_schema, options={"mode": "PERMISSIVE"}),
        )
    else:
        raise TypeError("Unsupported DataFrame type. Expected Polars or PySpark DataFrame.")


def polars_example() -> None:
    df: pl.DataFrame = pl.DataFrame(
        {
            "id": [1, 2],
            "content": [
                '{"name": "Alice", "age": 30}',
                '{"name": "Bob", "age": 25}',
            ],
        }
    )

    schema: pl.Struct = pl.Struct(
        [
            pl.Field("name", pl.String),
            pl.Field("age", pl.Int64),
        ]
    )

    decoded: pl.DataFrame = decode_json_columns(
        df=df,
        column="content",
        new_column="content_struct",
        json_schema=schema,
    )

    print("\nPolars result")
    print(decoded)


def spark_example() -> None:
    spark: SparkSession = (
        SparkSession.builder.appName("app")
        .config(
            "spark.driver.extraJavaOptions",
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
            "--add-opens=java.base/java.lang=ALL-UNNAMED",
        )
        .config(
            "spark.executor.extraJavaOptions",
            "--add-opens=java.base/java.nio=ALL-UNNAMED "
            "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED "
            "--add-opens=java.base/java.lang=ALL-UNNAMED",
        )
        .getOrCreate()
    )
    data: list[tuple[int, str]] = [
        (1, '{"name": "Alice", "age": 30}'),
        (2, '{"name": "Bob", "age": 25}'),
    ]

    df: SparkDataFrame = spark.createDataFrame(data, ["id", "content"])

    schema: StructType = StructType(
        [
            StructField("name", StringType(), True),
            StructField("age", IntegerType(), True),
        ]
    )

    decoded: SparkDataFrame = decode_json_columns(
        df=df,
        column="content",
        new_column="content_struct",
        json_schema=schema,
    )

    print("\nPySpark result")
    decoded.show(truncate=False)


polars_example()
spark_example()

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions