Skip to content

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512

Open
bowenli86 wants to merge 5 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset
Open

[FLINK-38918][python][kafka connector] support per cluster offset for DynamicKafkaSource in pyflink#28512
bowenli86 wants to merge 5 commits into
apache:masterfrom
bowenli86:dev/bowenli/codex/flink-38918-pyflink-per-cluster-offset

Conversation

@bowenli86

@bowenli86 bowenli86 commented Jun 22, 2026

Copy link
Copy Markdown
Member

What is the purpose of the change

This PR addresses FLINK-38918 by adding PyFlink support for configuring per-cluster starting and stopping offsets for DynamicKafkaSource.

The Java dynamic Kafka connector already supports offset initializers on ClusterMetadata and SingleClusterTopicMetadataService; this change exposes that capability through the PyFlink wrappers.

Brief change log

  • Added a PyFlink ClusterMetadata wrapper that forwards topics, properties, and optional starting/stopping KafkaOffsetsInitializers to the Java ClusterMetadata.
  • Extended SingleClusterTopicMetadataService with optional per-cluster starting and stopping offsets while preserving the existing constructor behavior when offsets are omitted.
  • Exported ClusterMetadata through pyflink.datastream.connectors.
  • Updated the flink-python test Kafka SQL connector dependency to 5.0.0-2.2.
  • Added Python unit tests for full, default, starting-only, and stopping-only per-cluster offset configurations.

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for SingleClusterTopicMetadataService offset forwarding.
  • Added unit tests in flink-python/pyflink/datastream/connectors/tests/test_dynamic_kafka.py for ClusterMetadata offset forwarding.
  • Added coverage for backward-compatible default behavior when no per-cluster offsets are provided.
  • Added coverage for partial configurations where only starting offsets or only stopping offsets are provided.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): yes, upgrades a flink-python test-scoped Kafka SQL connector dependency
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes, exposes PyFlink connector API additions
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? Python API docstrings

Was generative AI tooling used to co-author this PR?
  • Yes (Codex GPT-5)

Generated-by: Codex GPT-5

@flinkbot

flinkbot commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment thread flink-python/pyflink/datastream/connectors/dynamic_kafka.py
Comment thread flink-python/pyflink/datastream/connectors/dynamic_kafka.py Outdated
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Jun 23, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants