首页 技术 正文
技术 2022年11月10日
0 收藏 540 点赞 3,069 浏览 13951 个字

Fabric 1.4 源码分析peer节点启动

peer模块采用cobra库来实现cli命令。

Cobra提供简单的接口来创建强大的现代化CLI接口,比如git与go工具。Cobra同时也是一个程序, 用于创建CLI程序

peer支持的命令如下所示:

Usage:  peer [command]Available Commands:  chaincode   Operate a chaincode: install|instantiate|invoke|package|query|signpackage|upgrade|list.  channel     Operate a channel: create|fetch|join|list|update|signconfigtx|getinfo.  help        Help about any command  logging     Log levels: getlevel|setlevel|revertlevels.  node        Operate a peer node: start|status.  version     Print fabric peer version.Flags:  -h, --help                   help for peer      --logging-level string   Default logging level and overrides, see core.yaml for full syntax

通过peer 的docker-compose文件可知,peer启动命令为peer node start。从下列代码可知,peer启动时调用serve()接口。

var nodeStartCmd = &cobra.Command{    Use:   "start",    Short: "Starts the node.",    Long:  `Starts a node that interacts with the network.`,    RunE: func(cmd *cobra.Command, args []string) error {        if len(args) != 0 {            return fmt.Errorf("trailing args detected")        }        // Parsing of the command line is done so silence cmd usage        cmd.SilenceUsage = true        return serve(args)    },}

接下来深入分析serve()接口。

func serve(args []string) error {    // currently the peer only works with the standard MSP    // because in certain scenarios the MSP has to make sure    // that from a single credential you only have a single 'identity'.    // Idemix does not support this *YET* but it can be easily    // fixed to support it. For now, we just make sure that    // the peer only comes up with the standard MSP  // 当前peer启动时只支持标准MSP即Fabric。    mspType := mgmt.GetLocalMSP().GetType()    if mspType != msp.FABRIC {        panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))    }    // Trace RPCs with the golang.org/x/net/trace package. This was moved out of    // the deliver service connection factory as it has process wide implications    // and was racy with respect to initialization of gRPC clients and servers.    grpc.EnableTracing = true    logger.Infof("Starting %s", version.GetInfo())    //startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).    //Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)  // 创建ACL提供者 ACL访问控制列表    aclProvider := aclmgmt.NewACLProvider(        aclmgmt.ResourceGetter(peer.GetStableChannelConfig),    )    // 平台注册    pr := platforms.NewRegistry(        &golang.Platform{},        &node.Platform{},        &java.Platform{},        &car.Platform{},    )    // 定义部署链码提供者    deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}

DeployedCCInfoProvider实现了DeployedChaincodeInfoProvider。

DeployedChaincodeInfoProvider是ledger用于构建集合配置历史记录的依赖项
LSCC模块应该为这个依赖项提供实现

type DeployedChaincodeInfoProvider interface {    Namespaces() []string //命名空间    UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) // 保存更新的链码    ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) // 保存链码信息    CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) // 链码集合信息}

初始化账本资源ledgermgmt.Initialize

    // 获取通道MSP管理员。如果不存在则创建    identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {        return mgmt.GetManagerForChain(chainID)    }    // peer 初始化    // 保存 peer 一些基本信息 ListenAddress TLS    opsSystem := newOperationsSystem()    // 监听 ListenAddress    err := opsSystem.Start()    if err != nil {        return errors.WithMessage(err, "failed to initialize operations subystems")    }    defer opsSystem.Stop()    metricsProvider := opsSystem.Provider    logObserver := floggingmetrics.NewObserver(metricsProvider)    flogging.Global.SetObserver(logObserver)    // 实例化私密数据成员membershipInfoProvider 用来判断peer是否在某个私密数据的集合中    membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)    //initialize resource management exit    // 初始化账本资源 将前面实例化的对象都进行赋值    ledgermgmt.Initialize(        &ledgermgmt.Initializer{            CustomTxProcessors:            peer.ConfigTxProcessors,            PlatformRegistry:              pr,            DeployedChaincodeInfoProvider: deployedCCInfoProvider,            MembershipInfoProvider:        membershipInfoProvider,            MetricsProvider:               metricsProvider,            HealthCheckRegistry:           opsSystem,        },    )

初始化peer GRPC服务配置

    // Parameter overrides must be processed before any parameters are    // cached. Failures to cache cause the server to terminate immediately.    // 判断链码是否时开发者模式    if chaincodeDevMode {        logger.Info("Running in chaincode development mode")        logger.Info("Disable loading validity system chaincode")        viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)    }    // 缓存peer地址getLocalAddress address:port    if err := peer.CacheConfiguration(); err != nil {        return err    }    // 获取peer endpoint,没有则调用CacheConfiguration接口    peerEndpoint, err := peer.GetPeerEndpoint()    if err != nil {        err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)        return err    }    // 获取peer Host    peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)    if err != nil {        return fmt.Errorf("peer address is not in the format of host:port: %v", err)    }    listenAddr := viper.GetString("peer.listenAddress")    // 获取peer grpc相关配置 主要是TLS设置和心跳设置    serverConfig, err := peer.GetServerConfig()    if err != nil {        logger.Fatalf("Error loading secure config for peer (%s)", err)    }    // 设置GRPC最大并发2500    throttle := comm.NewThrottle(grpcMaxConcurrency)    // GRPC server的一些配置    serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")    serverConfig.MetricsProvider = metricsProvider    serverConfig.UnaryInterceptors = append(        serverConfig.UnaryInterceptors,        grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),        grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),        throttle.UnaryServerIntercptor,    )    serverConfig.StreamInterceptors = append(        serverConfig.StreamInterceptors,        grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),        grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),        throttle.StreamServerInterceptor,    )

将GRPC相关配置及Address传入创建GRPC服务器

  peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)    if err != nil {        logger.Fatalf("Failed to create peer server (%s)", err)    }

TLS及策略相关

    // TLS相关配置  if serverConfig.SecOpts.UseTLS {        logger.Info("Starting peer with TLS enabled")        // set up credential support        cs := comm.GetCredentialSupport()        roots, err := peer.GetServerRootCAs()        if err != nil {            logger.Fatalf("Failed to set TLS server root CAs: %s", err)        }        cs.ServerRootCAs = roots        // set the cert to use if client auth is requested by remote endpoints        clientCert, err := peer.GetClientCertificate()        if err != nil {            logger.Fatalf("Failed to set TLS client certificate: %s", err)        }        comm.GetCredentialSupport().SetClientCertificate(clientCert)    }    mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert    // 策略校验    policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {        return func(env *cb.Envelope, channelID string) error {            return aclProvider.CheckACL(resourceName, channelID, env)        }    }

创建deliver server 传输区块及过滤区块

    abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)    pb.RegisterDeliverServer(peerServer.Server(), abServer)

初始化链码服务

startChaincodeServer将完成与链代码相关的初始化,包括:
1)设置本地链代码安装路径
2)创建特定链代码的CA
3)启动特定链代码的gRPC监听服务

    // Initialize chaincode service    chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)    logger.Debugf("Running peer")

注册背书服务,gossip组件初始化等操作

 // Start the Admin server    startAdminServer(listenAddr, peerServer.Server(), metricsProvider)    privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {    // 分发私有数据到其他节点        return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)    }    // 获取本地签名    signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()    serializedIdentity, err := signingIdentity.Serialize()    if err != nil {        logger.Panicf("Failed serializing self identity: %v", err)    }    libConf := library.Config{}    if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {        return errors.WithMessage(err, "could not load YAML config")    }    reg := library.InitRegistry(libConf)    // 背书 验证相关配置    authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)    endorserSupport := &endorser.SupportImpl{        SignerSupport:    signingIdentity,        Peer:             peer.Default,        PeerSupport:      peer.DefaultSupport,        ChaincodeSupport: chaincodeSupport,        SysCCProvider:    sccp,        ACLProvider:      aclProvider,    }    endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)    validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)    signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)    channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)    pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)    pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{        ChannelStateRetriever:   channelStateRetriever,        TransientStoreRetriever: peer.TransientStoreFactory,        PluginMapper:            pluginMapper,        SigningIdentityFetcher:  signingIdentityFetcher,    })    endorserSupport.PluginEndorser = pluginEndorser    serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)    auth := authHandler.ChainFilters(serverEndorser, authFilters...)    // Register the Endorser server    pb.RegisterEndorserServer(peerServer.Server(), auth)    policyMgr := peer.NewChannelPolicyManagerGetter()    // Initialize gossip component    err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)    if err != nil {        return err    }    defer service.GetGossipService().Stop()    // register prover grpc service    // FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut    // err = registerProverService(peerServer, aclProvider, signingIdentity)    // if err != nil {    //  return err    // }

初始化系统链码。

    // initialize system chaincodes    // deploy system chaincodes    // 部署系统链码    sccp.DeploySysCCs("", ccp)    logger.Infof("Deployed system chaincodes")    // 查看已经安装等链码    installedCCs := func() ([]ccdef.InstalledChaincode, error) {        return packageProvider.ListInstalledChaincodes()    }    // 创建链码的生命周期    lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))    if err != nil {        logger.Panicf("Failed creating lifecycle: +%v", err)    }    // HandleMetadataUpdate在链代码生命周期更改发生变化时触发    onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {        // 更新链码    service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))    })    // 监听器 监听链码更新    lifecycle.AddListener(onUpdate)

通道相关配置

    // this brings up all the channels    peer.Initialize(func(cid string) {        logger.Debugf("Deploying system CC, for channel <%s>", cid)        sccp.DeploySysCCs(cid, ccp)        sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {      // 返回通道的查询器            return peer.GetLedger(cid).NewQueryExecutor()        }))        if err != nil {            logger.Panicf("Failed subscribing to chaincode lifecycle updates")        }    // 注册该通道ChaincodeLifecycleEventListener        cceventmgmt.GetMgr().Register(cid, sub)    }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),        pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)    // 获取peer一些配置    if viper.GetBool("peer.discovery.enabled") {        registerDiscoveryService(peerServer, policyMgr, lifecycle)    }    networkID := viper.GetString("peer.networkId")    logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)    // Get configuration before starting go routines to avoid    // racing in tests    profileEnabled := viper.GetBool("peer.profile.enabled")    profileListenAddress := viper.GetString("peer.profile.listenAddress")    // Start the grpc server. Done in a goroutine so we can deploy the    // genesis block if needed.    serve := make(chan error)    // 开启peer grpc服务    go func() {        var grpcErr error        if grpcErr = peerServer.Start(); grpcErr != nil {            grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)        } else {            logger.Info("peer server exited")        }        serve <- grpcErr    }()    // Start profiling http endpoint if enabled    if profileEnabled {        go func() {            logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)            if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {                logger.Errorf("Error starting profiler: %s", profileErr)            }        }()    }    go handleSignals(addPlatformSignals(map[os.Signal]func(){        syscall.SIGINT:  func() { serve <- nil },        syscall.SIGTERM: func() { serve <- nil },    }))    // peer启动区块归档任务    if ledgerconfig.IsDataDumpEnabled() {        logger.Debugf("DataDump:{DumpDir:%s,LoadDir:%s,MaxFileLimit:%d,DumpCron:%v,DumpInterval:%d,LoadRetryTimes:%d}",            ledgerconfig.GetDataDumpPath(), ledgerconfig.GetDataLoadPath(), ledgerconfig.GetDataDumpFileLimit(),            ledgerconfig.GetDataDumpCron(), ledgerconfig.GetDataDumpInterval(), ledgerconfig.GetDataLoadRetryTimes())        go func() {            cronList := ledgerconfig.GetDataDumpCron()            if cronList != nil && len(cronList) > 0 {                cronTask := cron.New()                cronTask.Start()                for _, crontab := range cronList {                    logger.Debugf("Crontab addFunc for %s", crontab)                    err := cronTask.AddFunc(crontab, func() {                        chainInfoArray := peer.GetChannelsInfo()                        for _, chainInfo := range chainInfoArray {                            chainId := chainInfo.ChannelId                            l := peer.GetLedger(chainId)                            if err := l.DataDump(datadump.DumpForCronTab); err != nil {                                logger.Errorf("Failed to datadump for [%s]", err)                            }                        }                    })                    if err != nil {                        logger.Errorf("Failed to add crontab task for %s", err)                    }                }            }        }()    }    logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)    if viper.GetBool("peer.enBlkrouter") {        go func() {            startBlockServer()        }()    }    // Block until grpc server exits    // 阻塞 直到grpc服务退出    return <-serve}

到这里Peer节点已经启动完成了,过程还是很复杂的,这里总结一下整体的过程:

  1. 首先就是读取配置信息,创建Cache结构,以及检测其他Peer节点的信息。

    CacheConfiguration(),主要保存其他Peer节点的相关信息。

  2. 创建PeerServer

    peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)

  3. 创建DeliverEventsServer

    1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    2. pb.RegisterDeliverServer(peerServer.Server(), abServer)
    3. fabric/core/peer/deliverevents.go,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()
  4. 启动ChaincodeServer

    1. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
    2. core/chaincode/chaincode_support.go,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个停止运行的链码,Stop():停止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():创建一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
  5. 启动AdminServer

    1. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    2. core/protos/peer/admin.go文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()等方法
  6. 创建EndorserServer

    1. pb.RegisterEndorserServer(peerServer.Server(), auth)
    2. core/endorser/endorser.go文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal(),这个方法值得看一下。
  7. 创建GossipService

    1. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    2. gossip/service/gossip_service.go,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()等方法
  8. 部署系统链码。

  9. 初始化通道。

  10. 启动gRPC服务。

  11. 如果启用了profile,还会启动监听服务。

参考:https://www.cnblogs.com/cbkj-xd/p/11141717.html

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,487
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,903
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,736
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,487
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,127
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,289