Skip to content

[Data] Get rid of generators to avoid intermediate state pinning#60598

Merged
alexeykudinkin merged 22 commits into
masterfrom
ak/map-tfm-ref-rel
Feb 13, 2026
Merged

[Data] Get rid of generators to avoid intermediate state pinning#60598
alexeykudinkin merged 22 commits into
masterfrom
ak/map-tfm-ref-rel

Conversation

@alexeykudinkin

@alexeykudinkin alexeykudinkin commented Jan 30, 2026

Copy link
Copy Markdown
Contributor

Description

I’ve realized that for fused Map transforms we’re holding a whole stack of intermediate results (batches) simply due to how yield works in Python:

  • When method yields all of its frame state (local vars) is preserved, pinning all of its intermediate state till the next iteration and not releasing it.

  • This is in contrast with the pure Iterator.__next__ method, returning from which, stack frame with all of its intermediate state is destroyed.

While this is not an issue most of the time, it's a big problem in cases when multiple Maps are fused:

  • With multiple operators & corresponding transformations being fused
  • Intermediate state along with inputs and outputs of each one are pinned until the next iteration
  • Total size of required heap memory scales up proportionally to the # of operators fused (ie more operators more heap)
  • This is exacerbated by the fact that now batch_size is None by default meaning that the whole block is an input and an output substantially increasing memory requirements.

Consider following example:

Generator Chain (Problem)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Generator A                                                                │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_a(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED in frame       │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_b(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of A)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_c(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of B)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory at yield point:                                                                                                                                                        
  ┌─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐                                                                                                                 
  │ input   │ A.batch │ A.result│ B.batch │ B.result│ C.batch │ ... ALL PINNED                                                                                                  
  └─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘                                                                                                                 
             ═══════════════════════════════════════════════                                                                                                                    
                      Cannot be GC'd until next iteration                                                                                                                       
                                                                                                                                                                                
  Iterator Chain (Solution)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Iterator A                                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory after return:                                                                                                                                                          
  ┌─────────┬─────────┐                                                                                                                                                         
  │ input   │ output  │  ... ONLY 2 objects pinned                                                                                                                              
  └─────────┴─────────┘                                                                                                                                                         
                                                                                                                                                                                
  All intermediates eligible for GC immediately after each __next__ returns                                                                                                     
                                                                                                                                                                                
  Key Difference                                                                                                                                                                
                                                                                                                                                                                
  GENERATOR                              ITERATOR                                                                                                                               
  ─────────────────────────────────────────────────────────────────                                                                                                             
  yield suspends execution         vs    return completes execution                                                                                                             
  frame stays alive                vs    frame is destroyed                                                                                                                     
  locals pinned until resume       vs    locals released immediately                                                                                                            
                                                                                                                                                                                
             ┌──────────┐                        ┌──────────┐                                                                                                                   
    yield ──►│ SUSPENDED│               return ──►│ COMPLETE │                                                                                                                  
             │  frame   │                        │  frame   │                                                                                                                   
             │  alive   │                        │destroyed │                                                                                                                   
             └──────────┘                        └──────────┘                                                                                                                   
                 │                                    │                                                                                                                         
                 ▼                                    ▼                                                                                                                         
           refs HELD                            refs RELEASED    

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@alexeykudinkin alexeykudinkin requested a review from a team as a code owner January 30, 2026 04:17
@alexeykudinkin alexeykudinkin added the go add ONLY when ready to merge, run all tests label Jan 30, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a solid piece of engineering that refactors the data processing pipeline to replace generator functions with iterator classes. This is a crucial change to prevent potential memory leaks caused by chained generators holding references to intermediate data. The changes are applied consistently across block_batching and map_transformer components. New iterator classes like _BatchingIterator, ShapeBlocksIterator, and _TransformingBatchIterator are introduced to encapsulate the iteration logic previously found in generators. A new test, test_chained_transforms_release_intermediates_between_batches, is added to verify that intermediate object references are correctly released, which is an excellent addition. The overall change is well-executed and improves memory management in Ray Data's critical path.

res = [batch]
out_batch = next(self._cur_output_iter)
except StopIteration:
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For improved clarity and robustness, it's better to explicitly reset self._cur_output_iter to None and continue the loop when the iterator is exhausted. This makes the state transition explicit and avoids relying on the iterator being overwritten later in the loop.

Suggested change
pass
self._cur_output_iter = None
continue
Comment thread python/ray/data/_internal/block_batching/block_batching.py Outdated
Comment thread python/ray/data/_internal/block_batching/block_batching.py Outdated
Comment thread python/ray/data/tests/test_map_transformer.py Outdated
Comment thread python/ray/data/tests/test_map_transformer.py Outdated
Comment thread python/ray/data/_internal/block_batching/block_batching.py
Comment thread python/ray/data/_internal/block_batching/block_batching.py Outdated
@alexeykudinkin alexeykudinkin changed the title [WIP][Data] Get rid of generators on the critical path Jan 30, 2026
@alexeykudinkin alexeykudinkin changed the title [Data] Get rid of generators on the critical path Jan 30, 2026
Comment thread python/ray/data/_internal/execution/operators/map_transformer.py
@ray-gardener ray-gardener Bot added the data Ray Data-related issues label Jan 30, 2026
@raulchen

Copy link
Copy Markdown
Contributor

The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save?

@alexeykudinkin

Copy link
Copy Markdown
Contributor Author

The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save?

Not yet. But we can math it out:

Currently we're using per single Map task

block-size (128Mb default) x N (number of fused transformations) 

With this change it will be just the block-size

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

try:
return next(self._input)
finally:
self._transformer._report_udf_time(time.perf_counter() - start)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UDF timing records time even when exceptions occur

Medium Severity

The new _UDFTimingIterator.__next__ uses a finally block to record UDF time, which means timing is recorded even when next(self._input) raises an exception. The old _udf_timed_iter only recorded timing after a successful next() call (the timing increment came after the next() returned). This changes timing metrics behavior: if UDFs fail repeatedly, the new code accumulates time for each failure while the old code wouldn't record time for failures.

Fix in Cursor Fix in Web

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changes make sense to me, do u have any release test results?

self._active_timer.__enter__()


class _UnwrappingIterator(Iterator[DataBatch]):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interest of simplication, do u think we can just fold this implementation into _UserTimingIterator since it's not being used elsewhere and all it does is index into .data?

self._input = input
self._transformer = transformer

def __iter__(self) -> "MapTransformer._UDFTimingIterator":

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this a lot returning self. What are ur thoughts on making it default in the base class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have a base class though

return _UserTimingIterator(_UnwrappingIterator(batch_iter), stats)


class _UserTimingIterator(Iterator[DataBatch]):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since most classes have a docstring, I think you should also add one here too

@iamjustinhsu

iamjustinhsu commented Feb 2, 2026

Copy link
Copy Markdown
Contributor

I'm curious if what u are doing can be simplified if we just do this:

┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Generator A                                                                │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_a(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          batch = process(batch)         `batch` PINNED in frame       │ │                                                                                               
  │  │          yield batch               ◄─── `result` PINNED in frame      │ │          
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                        

But I guess it still keeps batch and stack in frame

@raulchen

raulchen commented Feb 2, 2026

Copy link
Copy Markdown
Contributor

The idea and motivation look reasonable. Have you done any benchmarks on real workloads? E.g., how much memory can we save?

Not yet. But we can math it out:

Currently we're using per single Map task

block-size (128Mb default) x N (number of fused transformations) 

With this change it will be just the block-size

that I agree. but I'd still prefer verifying the effectiveness with a test. It could be a simple microbenchmark with multiple fused ops + large target block size.
(We don't measure memory in the release tests. Would be nice to also add that. but not a blocker for this PR)

Also, another concern I have is that this PR creates too many ad-hoc iterator classes. It'd be nice to unify some of them. E.g. some of them follow the same pattern and can be replaced with this

  class MappingIterator(Iterator[T]):
      """Iterator that applies a function to each item.
      
      Unlike a generator, locals are released when __next__ returns.
      """
      __slots__ = ('_source', '_fn', '_timer_fn')

      def __init__(
          self,
          source: Iterable[S],
          fn: Callable[[S], T],
          timer_fn: Optional[Callable[[], ContextManager]] = None,
      ):
          self._source = iter(source)
          self._fn = fn
          self._timer_fn = timer_fn

      def __iter__(self) -> "MappingIterator[T]":
          return self

      def __next__(self) -> T:
          item = next(self._source)
          if self._timer_fn:
              with self._timer_fn():
                  return self._fn(item)
          return self._fn(item)
@alexeykudinkin

Copy link
Copy Markdown
Contributor Author

Thanks for raising great points @raulchen @iamjustinhsu.

Will get back to this one after the release

@alexeykudinkin

Copy link
Copy Markdown
Contributor Author

that I agree. but I'd still prefer verifying the effectiveness with a test. It could be a simple microbenchmark with multiple fused ops + large target block size.
(We don't measure memory in the release tests. Would be nice to also add that. but not a blocker for this PR)

I've added the unit test for that, which asserts that we don't keep intermediate states anymore.

Expectation now is that peak memory should be lower for cases

  • With Map fusion
  • Larger batch sizes (or batch_size=none)
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…potentially pinning these objects in memory until next iteration

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
… of generators

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>

# Conflicts:
#	python/ray/data/_internal/execution/operators/map_operator.py
#	python/ray/data/_internal/execution/operators/map_transformer.py

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Tidying up

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Rebased iters onto MO

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@alexeykudinkin alexeykudinkin enabled auto-merge (squash) February 13, 2026 03:01
@alexeykudinkin alexeykudinkin merged commit 89148a4 into master Feb 13, 2026
7 checks passed
@alexeykudinkin alexeykudinkin deleted the ak/map-tfm-ref-rel branch February 13, 2026 03:40
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…-project#60598)

## Description

I’ve realized that for fused Map transforms we’re holding a whole stack
of intermediate results (batches) simply due to how yield works in
Python:

- When method yields all of its frame state (local vars) is preserved,
pinning all of its intermediate state till the next iteration and not
releasing it.

- This is in contrast with the pure `Iterator.__next__` method,
returning from which, stack frame with all of its intermediate state is
destroyed.

While this is not an issue most of the time, it's a big problem in cases
when multiple Maps are fused:

 - With multiple operators & corresponding transformations being fused
- Intermediate state along with inputs and outputs of each one are
pinned until the next iteration
- Total size of required heap memory scales up proportionally to the #
of operators fused (ie more operators more heap)
- This is exacerbated by the fact that now `batch_size` is None by
default meaning that the whole block is an input and an output
substantially increasing memory requirements.

Consider following example:

```
Generator Chain (Problem)

  ┌─────────────────────────────────────────────────────────────────────────────┐
  │  Generator A                                                                │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def transform_a(inputs):                                              │ │
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │
  │  │          result = process(batch)         `batch` PINNED in frame       │ │
  │  │          yield result               ◄─── `result` PINNED in frame      │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def transform_b(inputs):                                              │ │
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │
  │  │          result = process(batch)         `batch` PINNED (output of A)  │ │
  │  │          yield result               ◄─── `result` PINNED in frame      │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def transform_c(inputs):                                              │ │
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │
  │  │          result = process(batch)         `batch` PINNED (output of B)  │ │
  │  │          yield result               ◄─── `result` PINNED in frame      │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │                      to consumer                                            │
  └─────────────────────────────────────────────────────────────────────────────┘

  Memory at yield point:
  ┌─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐
  │ input   │ A.batch │ A.result│ B.batch │ B.result│ C.batch │ ... ALL PINNED
  └─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘
             ═══════════════════════════════════════════════
                      Cannot be GC'd until next iteration

  Iterator Chain (Solution)

  ┌─────────────────────────────────────────────────────────────────────────────┐
  │  Iterator A                                                                 │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def __next__(self):                                                   │ │
  │  │      batch = next(self._input)      # local var                        │ │
  │  │      result = process(batch)        # local var                        │ │
  │  │      return result                  ◄─── method RETURNS                │ │
  │  │                                          locals GO OUT OF SCOPE        │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def __next__(self):                                                   │ │
  │  │      batch = next(self._input)      # local var                        │ │
  │  │      result = process(batch)        # local var                        │ │
  │  │      return result                  ◄─── method RETURNS                │ │
  │  │                                          locals GO OUT OF SCOPE        │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │  ┌────────────────────────────────────────────────────────────────────────┐ │
  │  │  def __next__(self):                                                   │ │
  │  │      batch = next(self._input)      # local var                        │ │
  │  │      result = process(batch)        # local var                        │ │
  │  │      return result                  ◄─── method RETURNS                │ │
  │  │                                          locals GO OUT OF SCOPE        │ │
  │  └────────────────────────────────────────────────────────────────────────┘ │
  │                           │                                                 │
  │                           ▼                                                 │
  │                      to consumer                                            │
  └─────────────────────────────────────────────────────────────────────────────┘

  Memory after return:
  ┌─────────┬─────────┐
  │ input   │ output  │  ... ONLY 2 objects pinned
  └─────────┴─────────┘

  All intermediates eligible for GC immediately after each __next__ returns

  Key Difference

  GENERATOR                              ITERATOR
  ─────────────────────────────────────────────────────────────────
  yield suspends execution         vs    return completes execution
  frame stays alive                vs    frame is destroyed
  locals pinned until resume       vs    locals released immediately

             ┌──────────┐                        ┌──────────┐
    yield ──►│ SUSPENDED│               return ──►│ COMPLETE │
             │  frame   │                        │  frame   │
             │  alive   │                        │destroyed │
             └──────────┘                        └──────────┘
                 │                                    │
                 ▼                                    ▼
           refs HELD                            refs RELEASED
```

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
…-project#60598)

## Description

I’ve realized that for fused Map transforms we’re holding a whole stack
of intermediate results (batches) simply due to how yield works in
Python:

- When method yields all of its frame state (local vars) is preserved,
pinning all of its intermediate state till the next iteration and not
releasing it.

- This is in contrast with the pure `Iterator.__next__` method,
returning from which, stack frame with all of its intermediate state is
destroyed.


While this is not an issue most of the time, it's a big problem in cases
when multiple Maps are fused:

 - With multiple operators & corresponding transformations being fused 
- Intermediate state along with inputs and outputs of each one are
pinned until the next iteration
- Total size of required heap memory scales up proportionally to the #
of operators fused (ie more operators more heap)
- This is exacerbated by the fact that now `batch_size` is None by
default meaning that the whole block is an input and an output
substantially increasing memory requirements.

Consider following example:

```
Generator Chain (Problem)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Generator A                                                                │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_a(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED in frame       │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_b(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of A)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def transform_c(inputs):                                              │ │                                                                                               
  │  │      for batch in inputs:           ◄─── suspended at yield            │ │                                                                                               
  │  │          result = process(batch)         `batch` PINNED (output of B)  │ │                                                                                               
  │  │          yield result               ◄─── `result` PINNED in frame      │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory at yield point:                                                                                                                                                        
  ┌─────────┬─────────┬─────────┬─────────┬─────────┬─────────┐                                                                                                                 
  │ input   │ A.batch │ A.result│ B.batch │ B.result│ C.batch │ ... ALL PINNED                                                                                                  
  └─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘                                                                                                                 
             ═══════════════════════════════════════════════                                                                                                                    
                      Cannot be GC'd until next iteration                                                                                                                       
                                                                                                                                                                                
  Iterator Chain (Solution)                                                                                                                                                     
                                                                                                                                                                                
  ┌─────────────────────────────────────────────────────────────────────────────┐                                                                                               
  │  Iterator A                                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │  ┌────────────────────────────────────────────────────────────────────────┐ │                                                                                               
  │  │  def __next__(self):                                                   │ │                                                                                               
  │  │      batch = next(self._input)      # local var                        │ │                                                                                               
  │  │      result = process(batch)        # local var                        │ │                                                                                               
  │  │      return result                  ◄─── method RETURNS                │ │                                                                                               
  │  │                                          locals GO OUT OF SCOPE        │ │                                                                                               
  │  └────────────────────────────────────────────────────────────────────────┘ │                                                                                               
  │                           │                                                 │                                                                                               
  │                           ▼                                                 │                                                                                               
  │                      to consumer                                            │                                                                                               
  └─────────────────────────────────────────────────────────────────────────────┘                                                                                               
                                                                                                                                                                                
  Memory after return:                                                                                                                                                          
  ┌─────────┬─────────┐                                                                                                                                                         
  │ input   │ output  │  ... ONLY 2 objects pinned                                                                                                                              
  └─────────┴─────────┘                                                                                                                                                         
                                                                                                                                                                                
  All intermediates eligible for GC immediately after each __next__ returns                                                                                                     
                                                                                                                                                                                
  Key Difference                                                                                                                                                                
                                                                                                                                                                                
  GENERATOR                              ITERATOR                                                                                                                               
  ─────────────────────────────────────────────────────────────────                                                                                                             
  yield suspends execution         vs    return completes execution                                                                                                             
  frame stays alive                vs    frame is destroyed                                                                                                                     
  locals pinned until resume       vs    locals released immediately                                                                                                            
                                                                                                                                                                                
             ┌──────────┐                        ┌──────────┐                                                                                                                   
    yield ──►│ SUSPENDED│               return ──►│ COMPLETE │                                                                                                                  
             │  frame   │                        │  frame   │                                                                                                                   
             │  alive   │                        │destroyed │                                                                                                                   
             └──────────┘                        └──────────┘                                                                                                                   
                 │                                    │                                                                                                                         
                 ▼                                    ▼                                                                                                                         
           refs HELD                            refs RELEASED    
```

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants