Skip to content

[Data] Export dataset operator output schema to event logger#60086

Merged
edoakes merged 1 commit into
ray-project:masterfrom
coqian:coqian/schema
Jan 30, 2026
Merged

[Data] Export dataset operator output schema to event logger#60086
edoakes merged 1 commit into
ray-project:masterfrom
coqian:coqian/schema

Conversation

@coqian

@coqian coqian commented Jan 13, 2026

Copy link
Copy Markdown
Contributor

Description

In this PR, we export the output schema of dataset operators so that we can check the output field names and their data types for better observability.

If DataContext.enforce_schemas is set to False, the schema will only be export once for each operator; and if it is set to True, the schema will be exported whenever the fields get updated.

Example export event:

{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@coqian coqian force-pushed the coqian/schema branch 3 times, most recently from 2fc2b9a to fa41ed2 Compare January 15, 2026 08:02
@coqian coqian marked this pull request as ready for review January 27, 2026 08:20
@coqian coqian requested review from a team as code owners January 27, 2026 08:20
@coqian coqian changed the title [WIP][Data] Export dataset operator output schema to event logger Jan 27, 2026
Comment thread python/ray/data/_internal/operator_schema_exporter.py
Comment thread src/ray/protobuf/export_dataset_operator_schema.proto
@ray-gardener ray-gardener Bot added data Ray Data-related issues observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Jan 27, 2026
Comment thread src/ray/protobuf/export_dataset_operator_schema.proto
@coqian coqian force-pushed the coqian/schema branch 2 times, most recently from b819b1a to 2452647 Compare January 27, 2026 16:50
Comment thread python/ray/data/tests/test_operator_schema_export.py Outdated

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coqian thanks for doing this!!

Just some questions for my own understanding

  • Is the event_EXPORT_DATASET_OPERATOR_SCHEMA.log per operator or per dataset?
  • how do you detect changes in the file?
Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
@@ -0,0 +1,145 @@
"""Exporter API for Ray Data operator schema."""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this file and dataset_state.py can be moved under a new parent directory, thoughts?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh also, i was thinking -- metadata_exporter.py exports operators too. Can you explain the purposes of this exporter vs. that one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadata_exporter is mainly for the dataset and operator metadata and their relationship. After adding operator args and DataContext, the file can be too large to be exported reliably.
The operator_schema_exporter is specifically for the operator schema, as well as potential updates. It can also be large when there are many fields in the schema or it changes many times. And the operator schema is not related with dataset / operator state or other metadata, that's why I create a separate exporter for it.

Comment thread python/ray/data/_internal/operator_schema_exporter.py
return LoggerOperatorSchemaExporter.create_if_enabled()


class OperatorSchemaExporter(ABC):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably know best, but wondering when we would need this because I can't forsee if we'll need multiple OperatorSchemaExporter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just follows previous design, such as metadata_exporter.py. Right now we only export to files, but we may have some future export methods, e.g. to a database, web_request, etc. link to previous PR FYI.

Comment thread python/ray/data/_internal/execution/streaming_executor_state.py Outdated
@coqian

coqian commented Jan 28, 2026

Copy link
Copy Markdown
Contributor Author

@coqian thanks for doing this!!

Just some questions for my own understanding

  • Is the event_EXPORT_DATASET_OPERATOR_SCHEMA.log per operator or per dataset?
  • how do you detect changes in the file?
  • For dataset and operator in the same session, their schema will be written into the same file
  • We can leverage some other tools to detect file changes, such as vector
Comment thread src/ray/protobuf/export_dataset_operator_schema.proto
@coqian coqian force-pushed the coqian/schema branch 3 times, most recently from 0b5296e to d89646b Compare January 29, 2026 22:51
Comment thread src/ray/protobuf/export_dataset_operator_schema.proto

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Comment thread python/ray/data/_internal/operator_schema_exporter.py Outdated
Comment thread python/ray/data/_internal/operator_schema_exporter.py Outdated
Comment thread python/ray/data/_internal/operator_schema_exporter.py Outdated
Comment thread python/ray/data/_internal/operator_schema_exporter.py Outdated
return [json.loads(line) for line in data]


def test_export_operator_schema():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to u, but do u think it would be good to parameterize based on schema types? Here is an example of the ones we use for sanitization. Just wanted to cover all the cases, but no need to do that if u feel it's redundant

@coqian coqian Jan 30, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can introduce dependencies on how ToString() of different types are implemented in PyArrow. If it's updated in the future, it may break our test cases here. And different version of them can have different results: an example is the update of datetime64[s] dtype in pandas 2.0, it has different behaviors when we print it before and after 2.0, datetime64[ns] vs datetime64[s]. Our test cases mainly focus on if the schema can be exported to the log file instead.

Comment thread python/ray/data/_internal/operator_schema_exporter.py

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread src/ray/protobuf/export_dataset_operator_schema.proto
Signed-off-by: cong.qian <cong.qian@anyscale.com>
@MengjinYan

Copy link
Copy Markdown
Contributor

@edoakes for review & merge the proto as it seems that core team owns the proto code.

Looks like the PR updates the old export event schema for data events. I think until core support the feature for library events, we will need to keep the export event schema.

@edoakes edoakes added the go add ONLY when ready to merge, run all tests label Jan 30, 2026
@edoakes edoakes enabled auto-merge (squash) January 30, 2026 20:21
@edoakes edoakes merged commit 216182b into ray-project:master Jan 30, 2026
8 checks passed
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
elliot-barn pushed a commit that referenced this pull request Feb 9, 2026
## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <cong.qian@anyscale.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…ject#60086)

## Description
In this PR, we export the output schema of dataset operators so that we
can check the output field names and their data types for better
observability.

If `DataContext.enforce_schemas` is set to False, the schema will only
be export once for each operator; and if it is set to True, the schema
will be exported whenever the fields get updated.

Example export event:
```
{
  "event_id": "83b3A80eAa283CFFBf",
  "timestamp": 1769111404,
  "source_type": "EXPORT_DATASET_OPERATOR_SCHEMA",
  "event_data": {
    "operator_uuid": "a76411d1-5d28-4027-ad19-56cdcb073410",
    "schema_fields": {
      "int_field": "int64",
      "bool_field": "bool",
      "bytes_field": "binary",
      "string_field": "string",
      "date_field": "date32[day]",
      "datetime_field": "timestamp[us]",
      "numpy_int_field": "int32",
      "numpy_float_field": "double",
      "numpy_array_field": "ArrowTensorType(shape=(3,), dtype=int64)",
      "list_float_field": "list<item: double>",
      "list_list_field": "list<item: list<item: double>>",
      "nested_dict_field": "struct<a: struct<b: string>>",
      "none_field": "null",
    }
  }
}
```

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling

4 participants