日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

Hive Metastore ObjectStore PersistenceManage

系統(tǒng) 2974 0

最近在測(cè)試HCatalog,由于Hcatalog本身就是一個(gè)獨(dú)立JAR包,雖然它也可以運(yùn)行service,但是其實(shí)這個(gè)service就是metastore thrift server,我們?cè)趯懟贖catalog的mapreduce job時(shí)候只要把hcatalog JAR包和對(duì)應(yīng)的hive-site.xml文件加入libjars和HADOOP_CLASSPATH中就可以了。 不過(guò)在測(cè)試的時(shí)候還是遇到了一些問(wèn)題,hive metastore server在運(yùn)行了一段時(shí)間后會(huì)拋如下錯(cuò)誤

?

    2013-06-19 10:35:51,718 ERROR server.TThreadPoolServer (TThreadPoolServer.java:run(182)) - Error occurred during processing of message.

javax.jdo.JDOFatalUserException: Persistence Manager has been closed

        at org.datanucleus.jdo.JDOPersistenceManager.assertIsOpen(JDOPersistenceManager.java:2124)

        at org.datanucleus.jdo.JDOPersistenceManager.currentTransaction(JDOPersistenceManager.java:315)

        at org.apache.hadoop.hive.metastore.ObjectStore.openTransaction(ObjectStore.java:294)

        at org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:732)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

        at java.lang.reflect.Method.invoke(Method.java:597)

        at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:111)

        at com.sun.proxy.$Proxy5.getTable(Unknown Source)

        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:982)

        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5017)

        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$get_table.getResult(ThriftHiveMetastore.java:5005)

        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:32)

        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:34)
  

?

?

其中PersistenceManager負(fù)責(zé)控制一組持久化對(duì)象包括創(chuàng)建持久化對(duì)象和查詢對(duì)象,它是ObjectStore的一個(gè)實(shí)例變量,每個(gè)ObjectStore擁有一個(gè)pm,RawStore是metastore邏輯層和物理底層元數(shù)據(jù)庫(kù)(比如derby)交互的接口類,ObjectStore是RawStore的默認(rèn)實(shí)現(xiàn)類。Hive Metastore Server啟動(dòng)的時(shí)候會(huì)指定一個(gè)TProcessor,包裝了一個(gè)HMSHandler,內(nèi)部有一個(gè)ThreadLocal<RawStore> threadLocalMS實(shí)例變量,每個(gè)thread維護(hù)一個(gè)RawStore

?

        private final ThreadLocal<RawStore> threadLocalMS =

      new ThreadLocal<RawStore>() {

        @Override

        protected synchronized RawStore initialValue() {

          return null;

        }

      };
  


每一個(gè)從hive metastore client過(guò)來(lái)的請(qǐng)求都會(huì)從線程池中分配一個(gè) WorkerProcess來(lái)處理,在HMSHandler中每一個(gè)方法都會(huì)通過(guò)getMS()獲取rawstore instance來(lái)做具體操作

?

?

        public RawStore getMS() throws MetaException {

      RawStore ms = threadLocalMS.get();

      if (ms == null) {

        ms = newRawStore();

        threadLocalMS.set(ms);

        ms = threadLocalMS.get();

      }

      return ms;

    }
  

看得出來(lái)RawStore是延遲加載,初始化后綁定到threadlocal變量中可以為以后復(fù)用

?


?

        private RawStore newRawStore() throws MetaException {

      LOG.info(addPrefix("Opening raw store with implemenation class:"

          + rawStoreClassName));

      Configuration conf = getConf();



      return RetryingRawStore.getProxy(hiveConf, conf, rawStoreClassName, threadLocalId.get());

    }
  


RawStore使用了動(dòng)態(tài)代理模式(繼承 InvocationHandler接口 ),內(nèi)部實(shí)現(xiàn)了invoke函數(shù),通過(guò)method.invoke()執(zhí)行真正的邏輯,這樣的好處是可以在 method.invoke()上下文中添加自己其他的邏輯,RetryingRawStore就是在通過(guò)捕捉invoke函數(shù)拋出的異常,來(lái)達(dá)到重試的效果。由于使用reflection機(jī)制,異常是wrap在 InvocationTargetException中的, 不過(guò)在hive 0.9中竟然在捕捉到 此異常后直接throw出來(lái)了,而不是retry,明顯不對(duì)啊。我對(duì)它修改了下,拿出wrap的target exception,判斷是不是instance of jdoexception的,再做相應(yīng)的處理

?

?

      @Override

  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

    Object ret = null;



    boolean gotNewConnectUrl = false;

    boolean reloadConf = HiveConf.getBoolVar(hiveConf,

        HiveConf.ConfVars.METASTOREFORCERELOADCONF);

    boolean reloadConfOnJdoException = false;



    if (reloadConf) {

      updateConnectionURL(getConf(), null);

    }



    int retryCount = 0;

    Exception caughtException = null;

    while (true) {

      try {

        if (reloadConf || gotNewConnectUrl || reloadConfOnJdoException) {

          initMS();

        }

        ret = method.invoke(base, args);

        break;

      } catch (javax.jdo.JDOException e) {

        caughtException = (javax.jdo.JDOException) e.getCause();

      } catch (UndeclaredThrowableException e) {

        throw e.getCause();

      } catch (InvocationTargetException e) {

        Throwable t = e.getTargetException();

        if (t instanceof JDOException){

          caughtException = (JDOException) e.getTargetException();

          reloadConfOnJdoException = true;

          LOG.error("rawstore jdoexception:" + caughtException.toString());

        }else {

            throw e.getCause();

        }

      }



      if (retryCount >= retryLimit) {

        throw caughtException;

      }



      assert (retryInterval >= 0);

      retryCount++;

      LOG.error(

          String.format(

              "JDO datastore error. Retrying metastore command " +

                  "after %d ms (attempt %d of %d)", retryInterval, retryCount, retryLimit));

      Thread.sleep(retryInterval);

      // If we have a connection error, the JDO connection URL hook might

      // provide us with a new URL to access the datastore.

      String lastUrl = getConnectionURL(getConf());

      gotNewConnectUrl = updateConnectionURL(getConf(), lastUrl);

    }

    return ret;

  }
  


初始化RawStore有兩種方式,一種是在 RetryingRawStore的構(gòu)造函數(shù)中調(diào)用" this.base = (RawStore) ReflectionUtils.newInstance(rawStoreClass, conf); " ?因?yàn)镺bjectStore實(shí)現(xiàn)了Configurable,在newInstance方法中主動(dòng)調(diào)用里面的setConf(conf)方法初始化RawStore,還有一種情況是在捕捉到異常后retry,也會(huì)調(diào)用 base.setConf(getConf());

?

?

    private void initMS() {

    base.setConf(getConf());

  }
  

?


ObjectStore的setConf方法中,先將PersistenceManagerFactory鎖住,pm close掉,設(shè)置成NULL,再初始化pm

?

    public void setConf(Configuration conf) {

    // Although an instance of ObjectStore is accessed by one thread, there may

    // be many threads with ObjectStore instances. So the static variables

    // pmf and prop need to be protected with locks.

    pmfPropLock.lock();

    try {

      isInitialized = false;

      hiveConf = conf;

      Properties propsFromConf = getDataSourceProps(conf);

      boolean propsChanged = !propsFromConf.equals(prop);



      if (propsChanged) {

        pmf = null;

        prop = null;

      }



      assert(!isActiveTransaction());

      shutdown();

      // Always want to re-create pm as we don't know if it were created by the

      // most recent instance of the pmf

      pm = null;

      openTrasactionCalls = 0;

      currentTransaction = null;

      transactionStatus = TXN_STATUS.NO_STATE;



      initialize(propsFromConf);



      if (!isInitialized) {

        throw new RuntimeException(

        "Unable to create persistence manager. Check dss.log for details");

      } else {

        LOG.info("Initialized ObjectStore");

      }

    } finally {

      pmfPropLock.unlock();

    }

  }
  

?

    private void initialize(Properties dsProps) {

    LOG.info("ObjectStore, initialize called");

    prop = dsProps;

    pm = getPersistenceManager();

    isInitialized = pm != null;

    return;

  }
  


回到一開始報(bào)錯(cuò)的那段信息,怎么會(huì)Persistence Manager會(huì)被關(guān)閉呢,仔細(xì)排查后才發(fā)現(xiàn)是由于HCatalog使用HiveMetastoreClient用完后主動(dòng)調(diào)用了close方法,而一般Hive里面內(nèi)部不會(huì)調(diào)這個(gè)方法.

?

HiveMetaStoreClient.java

?

    public void close() {

    isConnected = false;

    try {

      if (null != client) {

        client.shutdown();

      }

    } catch (TException e) {

      LOG.error("Unable to shutdown local metastore client", e);

    }

    // Transport would have got closed via client.shutdown(), so we dont need this, but

    // just in case, we make this call.

    if ((transport != null) && transport.isOpen()) {

      transport.close();

    }

  }
  


對(duì)應(yīng)server端HMSHandler中的shutdown方法

    @Override

    public void shutdown() {

      logInfo("Shutting down the object store...");

      RawStore ms = threadLocalMS.get();

      if (ms != null) {

        ms.shutdown();

        ms = null;

      }

      logInfo("Metastore shutdown complete.");

    }
  


ObjectStore的shutdown方法

?

?

    public void shutdown() {

    if (pm != null) {

      pm.close();

    }

  }
  

?

?

?

我們看到shutdown方法里面只是把當(dāng)前thread的ObjectStore拿出來(lái)后,做了一個(gè)ObjectStore shutdown方法,把pm關(guān)閉了。但是并沒(méi)有把ObjectStore銷毀掉,它還是存在于threadLocalMS中,下次還是會(huì)被拿出來(lái),下一次這個(gè)thread服務(wù)于另外一個(gè)請(qǐng)求的時(shí)候又會(huì)被get出ObjectSture來(lái),但是由于里面的pm已經(jīng)close掉了所以肯定拋異常。正確的做法是應(yīng)該加上threadLocalMS.remove()或者threadLocalMS.set(null),主動(dòng)將其從ThreadLocalMap中刪除。

修改后的 shutdown方法

?

    public void shutdown() {

      logInfo("Shutting down the object store...");

      RawStore ms = threadLocalMS.get();

      if (ms != null) {

        ms.shutdown();

        ms = null;

        threadLocalMS.remove();

      }

      logInfo("Metastore shutdown complete.");

    }
  

?


Hive Metastore ObjectStore PersistenceManager自動(dòng)關(guān)閉bug解析


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 仙居县| 青神县| 泰安市| 泗水县| 玛纳斯县| 南投县| 琼结县| 钟祥市| 乾安县| 铁力市| 尤溪县| 张北县| 航空| 禄丰县| 定南县| 衢州市| 天水市| 仁怀市| 淳安县| 海林市| 泗水县| 广汉市| 高碑店市| 仁怀市| 湘潭县| 福海县| 辽宁省| 永定县| 桦川县| 无锡市| 南陵县| 南京市| 浑源县| 日土县| 泸水县| 宣恩县| 盐城市| 若羌县| 富民县| 奉节县| 嘉兴市|