Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev-support/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pipeline {
environment {
YETUS='yetus'
// Branch or tag name. Yetus release tags are 'rel/X.Y.Z'
YETUS_VERSION='rel/0.14.0'
YETUS_VERSION='a7d29a6a72750a0c5c39512f33945e773e69303e'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not forget to revert this in a follow-up, once this gets merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @brumi1024 for the review!

}

parameters {
Expand All @@ -71,7 +71,7 @@ pipeline {
checkout([
$class: 'GitSCM',
branches: [[name: "${env.YETUS_VERSION}"]],
userRemoteConfigs: [[ url: 'https://github.com/apache/yetus.git']]]
userRemoteConfigs: [[ url: 'https://github.com/ayushtkn/yetus.git']]]
)
}
}
Expand Down
7 changes: 7 additions & 0 deletions hadoop-client-modules/hadoop-client-minicluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,13 @@
<exclude>**/*.java</exclude>
</excludes>
</filter>
<!-- Some of our dependencies include html, so remove it. -->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>**/*.html</exclude>
</excludes>
</filter>
<!-- We pull in several test jars; keep out the actual test classes -->
<filter>
<artifact>*:*</artifact>
Expand Down
5 changes: 5 additions & 0 deletions hadoop-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,11 @@
<artifactId>jersey-media-json-jettison</artifactId>
<version>${jersey2.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-moxy</artifactId>
<version>${jersey2.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jaxb</groupId>
<artifactId>jaxb-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ public void setUp() throws Exception {
}

public final WebTarget targetWithJsonObject() {
return target().register(new JettisonObjectProvider.App());
return target();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@
</dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jettison</artifactId>
<artifactId>jersey-media-moxy</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@

import java.io.StringWriter;

import javax.ws.rs.core.MediaType;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;

import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;

import org.eclipse.persistence.jaxb.MarshallerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.glassfish.jersey.jettison.JettisonJaxbContext;
import org.glassfish.jersey.jettison.JettisonMarshaller;

/**
* Periodic heart beat from a <code>ResourceManager</code> participating in
* federation to indicate liveliness. The heart beat publishes the current
Expand All @@ -48,12 +52,18 @@ public class FederationStateStoreHeartbeat implements Runnable {
private final FederationStateStore stateStoreService;
private final ResourceScheduler rs;
private String capability;
private JAXBContextResolver resolver;

public FederationStateStoreHeartbeat(SubClusterId subClusterId,
FederationStateStore stateStoreClient, ResourceScheduler scheduler) {
public FederationStateStoreHeartbeat(
SubClusterId subClusterId,
FederationStateStore stateStoreClient,
ResourceScheduler scheduler,
JAXBContextResolver resolver
) {
this.stateStoreService = stateStoreClient;
this.subClusterId = subClusterId;
this.rs = scheduler;
this.resolver = resolver;
LOG.info("Initialized Federation membership for cluster with timestamp: {}. ",
ResourceManager.getClusterTimeStamp());
}
Expand All @@ -66,12 +76,11 @@ private void updateClusterState() {
try {
// get the current state
ClusterMetricsInfo clusterMetricsInfo = new ClusterMetricsInfo(rs);

JettisonJaxbContext jettisonJaxbContext = new JettisonJaxbContext(ClusterMetricsInfo.class);
JettisonMarshaller jsonMarshaller = jettisonJaxbContext.createJsonMarshaller();
JAXBContext context = resolver.getContext(ClusterMetricsInfo.class);
Marshaller marshaller = context.createMarshaller();
marshaller.setProperty(MarshallerProperties.MEDIA_TYPE, MediaType.APPLICATION_JSON);
StringWriter stringWriter = new StringWriter();
jsonMarshaller.marshallToJSON(clusterMetricsInfo, stringWriter);

marshaller.marshal(clusterMetricsInfo, stringWriter);
capability = stringWriter.toString();
} catch (Exception e) {
LOG.warn("Exception while trying to generate cluster state,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.JAXBContextResolver;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class FederationStateStoreService extends AbstractService
private String cleanUpThreadNamePrefix = "FederationStateStoreService-Clean-Thread";
private int cleanUpRetryCountNum;
private long cleanUpRetrySleepTime;
private JAXBContextResolver resolver;

public FederationStateStoreService(RMContext rmContext) {
super(FederationStateStoreService.class.getName());
Expand Down Expand Up @@ -182,6 +184,8 @@ protected void serviceInit(Configuration conf) throws Exception {
this.metrics = FederationStateStoreServiceMetrics.getMetrics();
LOG.info("Initialized federation statestore service metrics.");

this.resolver = new JAXBContextResolver(conf);

super.serviceInit(conf);
}

Expand Down Expand Up @@ -252,7 +256,7 @@ private void registerAndInitializeHeartbeat() {
"Failed to register Federation membership with the StateStore", e);
}
stateStoreHeartbeat = new FederationStateStoreHeartbeat(subClusterId,
stateStoreClient, rmContext.getScheduler());
stateStoreClient, rmContext.getScheduler(), resolver);
scheduledExecutorService =
HadoopExecutors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleWithFixedDelay(stateStoreHeartbeat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

package org.apache.hadoop.yarn.server.resourcemanager.webapp;

import org.glassfish.jersey.jettison.JettisonJaxbContext;
import org.eclipse.persistence.jaxb.JAXBContextFactory;
import org.eclipse.persistence.jaxb.MarshallerProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -35,138 +35,43 @@
import javax.xml.bind.JAXBContext;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FifoSchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UserMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.UsersInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerHealthInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInformationsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueAclsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueAclInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ContainerLaunchContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LocalResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.jsonprovider.ClassSerialisationConfig;

@Singleton
@Provider
public class JAXBContextResolver implements ContextResolver<JAXBContext> {

private static final Logger LOG = LoggerFactory.getLogger(JAXBContextResolver.class.getName());

private final Map<Class, JAXBContext> typesContextMap;
private final Map<Class, JAXBContext> typesContextMap = new HashMap<>();

public JAXBContextResolver() throws Exception {
this(new Configuration());
}

@Inject
public JAXBContextResolver(@javax.inject.Named("conf") Configuration conf) throws Exception {

JAXBContext context;
JAXBContext unWrappedRootContext;

// you have to specify all the dao classes here
final Class[] cTypes =
{ AppInfo.class, AppAttemptInfo.class, AppAttemptsInfo.class,
ClusterInfo.class, CapacitySchedulerQueueInfo.class,
FifoSchedulerInfo.class, SchedulerTypeInfo.class, NodeInfo.class,
UserMetricsInfo.class, CapacitySchedulerInfo.class,
ClusterMetricsInfo.class, SchedulerInfo.class, AppsInfo.class,
NodesInfo.class, RemoteExceptionData.class,
CapacitySchedulerQueueInfoList.class, ResourceInfo.class,
UsersInfo.class, UserInfo.class, ApplicationStatisticsInfo.class,
StatisticsItemInfo.class, CapacitySchedulerHealthInfo.class,
FairSchedulerQueueInfoList.class, AppTimeoutsInfo.class,
AppTimeoutInfo.class, ResourceInformationsInfo.class,
ActivitiesInfo.class, AppActivitiesInfo.class,
QueueAclsInfo.class, QueueAclInfo.class,
BulkActivitiesInfo.class};

// these dao classes need root unwrapping
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
ContainerLaunchContextInfo.class, LocalResourceInfo.class,
DelegationToken.class, AppQueue.class, AppPriority.class,
ResourceOptionInfo.class };

ArrayList<Class> finalcTypesList = new ArrayList<>();
ArrayList<Class> finalRootUnwrappedTypesList = new ArrayList<>();

Collections.addAll(finalcTypesList, cTypes);
Collections.addAll(finalRootUnwrappedTypesList, rootUnwrappedTypes);

// Add Custom DAO Classes
Class[] daoClasses = null;
Class[] unwrappedDaoClasses = null;
boolean loadCustom = true;
try {
daoClasses = conf
.getClasses(YarnConfiguration.YARN_HTTP_WEBAPP_CUSTOM_DAO_CLASSES);
unwrappedDaoClasses = conf.getClasses(
YarnConfiguration.YARN_HTTP_WEBAPP_CUSTOM_UNWRAPPED_DAO_CLASSES);
} catch (Exception e) {
LOG.warn("Failed to load custom dao class: ", e);
loadCustom = false;
}

if (loadCustom) {
if (daoClasses != null) {
Collections.addAll(finalcTypesList, daoClasses);
LOG.debug("Added custom dao classes: {}.", Arrays.toString(daoClasses));
}
if (unwrappedDaoClasses != null) {
Collections.addAll(finalRootUnwrappedTypesList, unwrappedDaoClasses);
LOG.debug("Added custom Unwrapped dao classes: {}", Arrays.toString(unwrappedDaoClasses));
}
}

final Class[] finalcTypes = finalcTypesList
.toArray(new Class[finalcTypesList.size()]);
final Class[] finalRootUnwrappedTypes = finalRootUnwrappedTypesList
.toArray(new Class[finalRootUnwrappedTypesList.size()]);

this.typesContextMap = new HashMap<>();
context = new JettisonJaxbContext(finalcTypes);
unWrappedRootContext = new JettisonJaxbContext(finalRootUnwrappedTypes);
for (Class type : finalcTypes) {
typesContextMap.put(type, context);
}
for (Class type : finalRootUnwrappedTypes) {
typesContextMap.put(type, unWrappedRootContext);
}
ClassSerialisationConfig classSerialisationConfig = new ClassSerialisationConfig(conf);
Set<Class<?>> wrappedClasses = classSerialisationConfig.getWrappedClasses();
Set<Class<?>> unWrappedClasses = classSerialisationConfig.getUnWrappedClasses();

//WARNING: AFAIK these properties not respected by MOXyJsonProvider
//For details check MOXyJsonProvider#readFrom method
JAXBContext wrappedContext = JAXBContextFactory.createContext(
wrappedClasses.toArray(new Class[0]),
Collections.singletonMap(MarshallerProperties.JSON_INCLUDE_ROOT, true)
);
JAXBContext unWrappedContext = JAXBContextFactory.createContext(
unWrappedClasses.toArray(new Class[0]),
Collections.singletonMap(MarshallerProperties.JSON_INCLUDE_ROOT, false)
);

wrappedClasses.forEach(type -> typesContextMap.put(type, wrappedContext));
unWrappedClasses.forEach(type -> typesContextMap.put(type, unWrappedContext));
}

@Override
public JAXBContext getContext(Class<?> objectType) {
return typesContextMap.get(objectType);
JAXBContext jaxbContext = typesContextMap.get(objectType);
LOG.trace("Context for {} is {}", objectType, jaxbContext);
return jaxbContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.jsonprovider.JsonProviderFeature;
import org.apache.hadoop.yarn.util.RMHAUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
Expand All @@ -36,7 +38,6 @@

import javax.servlet.Filter;
import org.glassfish.jersey.internal.inject.AbstractBinder;
import org.glassfish.jersey.jettison.JettisonFeature;
import org.glassfish.jersey.server.ResourceConfig;

/**
Expand All @@ -60,7 +61,8 @@ public ResourceConfig resourceConfig() {
config.register(new JerseyBinder());
config.register(RMWebServices.class);
config.register(GenericExceptionHandler.class);
config.register(new JettisonFeature()).register(JAXBContextResolver.class);
config.register(JsonProviderFeature.class);
config.register(JAXBContextResolver.class);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public NodeLabelsInfo(List<NodeLabel> nodeLabels) {
this.nodeLabelsInfo.add(new NodeLabelInfo(label));
}
}

public NodeLabelsInfo(Set<String> nodeLabelsName) {
this.nodeLabelsInfo = new ArrayList<>();
for (String labelName : nodeLabelsName) {
Expand All @@ -75,7 +75,7 @@ public Set<NodeLabel> getNodeLabels() {
}
return nodeLabels;
}

public List<String> getNodeLabelsName() {
ArrayList<String> nodeLabelsName = new ArrayList<>();
for (NodeLabelInfo label : nodeLabelsInfo) {
Expand Down
Loading